Puller refactor
1) Refactor pullers and add tests.
2) Add timeout to a puller.
mPullTimeoutNs is intrinsic to puller. A pull taking longer than this is
deemed failed and the data discarded.
A metric or StatsPullerManager requesting a pull should monitor the pull
and have deadlineNs. A successful pull may come later than desired due
to statsd processing delays.
3) Add unit tests to puller now that the base puller is more
complicated.
Bug: 118756964
Test: unit test
Change-Id: I0e5d47e2527391f7beef4b2d06bfd5c2f82f1179
diff --git a/cmds/statsd/Android.bp b/cmds/statsd/Android.bp
index 0114ff4..f0b751d 100644
--- a/cmds/statsd/Android.bp
+++ b/cmds/statsd/Android.bp
@@ -216,6 +216,7 @@
"tests/anomaly/AnomalyTracker_test.cpp",
"tests/ConfigManager_test.cpp",
"tests/external/puller_util_test.cpp",
+ "tests/external/StatsPuller_test.cpp",
"tests/indexed_priority_queue_test.cpp",
"tests/LogEntryMatcher_test.cpp",
"tests/LogEvent_test.cpp",
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index 50b64b9..f2a4663 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -704,7 +704,7 @@
status_t StatsService::cmd_print_pulled_metrics(int out, const Vector<String8>& args) {
int s = atoi(args[1].c_str());
vector<shared_ptr<LogEvent> > stats;
- if (mPullerManager->Pull(s, getElapsedRealtimeNs(), &stats)) {
+ if (mPullerManager->Pull(s, &stats)) {
for (const auto& it : stats) {
dprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
}
diff --git a/cmds/statsd/src/external/PowerStatsPuller.h b/cmds/statsd/src/external/PowerStatsPuller.h
index dd5ff8f..6f15bd6 100644
--- a/cmds/statsd/src/external/PowerStatsPuller.h
+++ b/cmds/statsd/src/external/PowerStatsPuller.h
@@ -28,6 +28,8 @@
class PowerStatsPuller : public StatsPuller {
public:
PowerStatsPuller();
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/external/ResourceHealthManagerPuller.h b/cmds/statsd/src/external/ResourceHealthManagerPuller.h
index ba6e6c3..f650fcc 100644
--- a/cmds/statsd/src/external/ResourceHealthManagerPuller.h
+++ b/cmds/statsd/src/external/ResourceHealthManagerPuller.h
@@ -29,6 +29,8 @@
class ResourceHealthManagerPuller : public StatsPuller {
public:
explicit ResourceHealthManagerPuller(int tagId);
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/external/ResourceThermalManagerPuller.h b/cmds/statsd/src/external/ResourceThermalManagerPuller.h
index 13c675a..5313792 100644
--- a/cmds/statsd/src/external/ResourceThermalManagerPuller.h
+++ b/cmds/statsd/src/external/ResourceThermalManagerPuller.h
@@ -29,6 +29,8 @@
class ResourceThermalManagerPuller : public StatsPuller {
public:
ResourceThermalManagerPuller();
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/external/StatsCompanionServicePuller.h b/cmds/statsd/src/external/StatsCompanionServicePuller.h
index a16baf0..2e13320 100644
--- a/cmds/statsd/src/external/StatsCompanionServicePuller.h
+++ b/cmds/statsd/src/external/StatsCompanionServicePuller.h
@@ -26,13 +26,13 @@
class StatsCompanionServicePuller : public StatsPuller {
public:
explicit StatsCompanionServicePuller(int tagId);
- bool PullInternal(vector<std::shared_ptr<LogEvent> >* data) override;
void SetStatsCompanionService(sp<IStatsCompanionService> statsCompanionService) override;
private:
Mutex mStatsCompanionServiceLock;
sp<IStatsCompanionService> mStatsCompanionService = nullptr;
+ bool PullInternal(vector<std::shared_ptr<LogEvent> >* data) override;
};
} // namespace statsd
diff --git a/cmds/statsd/src/external/StatsPuller.cpp b/cmds/statsd/src/external/StatsPuller.cpp
index 7043d66..c7c22ee 100644
--- a/cmds/statsd/src/external/StatsPuller.cpp
+++ b/cmds/statsd/src/external/StatsPuller.cpp
@@ -34,48 +34,52 @@
StatsPuller::StatsPuller(const int tagId)
: mTagId(tagId) {
- // Pullers can cause significant impact to system health and battery.
- // So that we don't pull too frequently.
- mCoolDownNs = StatsPullerManager::kAllPullAtomInfo.find(tagId)->second.coolDownNs;
- VLOG("Puller for tag %d created. Cooldown set to %lld", mTagId, (long long)mCoolDownNs);
}
-bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr<LogEvent>>* data) {
+bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) {
lock_guard<std::mutex> lock(mLock);
- int64_t wallClockTimeNs = getWallClockNs();
+ int64_t elapsedTimeNs = getElapsedRealtimeNs();
StatsdStats::getInstance().notePull(mTagId);
- if (elapsedTimeNs - mLastPullTimeNs < mCoolDownNs) {
- (*data) = mCachedData;
- StatsdStats::getInstance().notePullFromCache(mTagId);
- StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs);
- return true;
+ const bool shouldUseCache = elapsedTimeNs - mLastPullTimeNs <
+ StatsPullerManager::kAllPullAtomInfo.at(mTagId).coolDownNs;
+ if (shouldUseCache) {
+ if (mHasGoodData) {
+ (*data) = mCachedData;
+ StatsdStats::getInstance().notePullFromCache(mTagId);
+ }
+ return mHasGoodData;
}
- if (mMinPullIntervalNs > elapsedTimeNs - mLastPullTimeNs) {
- mMinPullIntervalNs = elapsedTimeNs - mLastPullTimeNs;
- StatsdStats::getInstance().updateMinPullIntervalSec(mTagId,
- mMinPullIntervalNs / NS_PER_SEC);
+
+ if (mLastPullTimeNs > 0) {
+ StatsdStats::getInstance().updateMinPullIntervalSec(
+ mTagId, (elapsedTimeNs - mLastPullTimeNs) / NS_PER_SEC);
}
mCachedData.clear();
mLastPullTimeNs = elapsedTimeNs;
- int64_t pullStartTimeNs = getElapsedRealtimeNs();
- bool ret = PullInternal(&mCachedData);
- if (!ret) {
- mCachedData.clear();
- return false;
+ mHasGoodData = PullInternal(&mCachedData);
+ if (!mHasGoodData) {
+ return mHasGoodData;
}
- StatsdStats::getInstance().notePullTime(mTagId, getElapsedRealtimeNs() - pullStartTimeNs);
- for (const shared_ptr<LogEvent>& data : mCachedData) {
- data->setElapsedTimestampNs(elapsedTimeNs);
- data->setLogdWallClockTimestampNs(wallClockTimeNs);
+ const int64_t pullDurationNs = getElapsedRealtimeNs() - elapsedTimeNs;
+ StatsdStats::getInstance().notePullTime(mTagId, pullDurationNs);
+ const bool pullTimeOut =
+ pullDurationNs > StatsPullerManager::kAllPullAtomInfo.at(mTagId).pullTimeoutNs;
+ if (pullTimeOut) {
+ // Something went wrong. Discard the data.
+ clearCacheLocked();
+ mHasGoodData = false;
+ StatsdStats::getInstance().notePullTimeout(mTagId);
+ ALOGW("Pull for atom %d exceeds timeout %lld nano seconds.", mTagId,
+ (long long)pullDurationNs);
+ return mHasGoodData;
}
if (mCachedData.size() > 0) {
mapAndMergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId);
- (*data) = mCachedData;
}
- StatsdStats::getInstance().notePullDelay(mTagId, getElapsedRealtimeNs() - elapsedTimeNs);
- return ret;
+ (*data) = mCachedData;
+ return mHasGoodData;
}
int StatsPuller::ForceClearCache() {
@@ -84,6 +88,10 @@
int StatsPuller::clearCache() {
lock_guard<std::mutex> lock(mLock);
+ return clearCacheLocked();
+}
+
+int StatsPuller::clearCacheLocked() {
int ret = mCachedData.size();
mCachedData.clear();
mLastPullTimeNs = 0;
@@ -91,7 +99,8 @@
}
int StatsPuller::ClearCacheIfNecessary(int64_t timestampNs) {
- if (timestampNs - mLastPullTimeNs > mCoolDownNs) {
+ if (timestampNs - mLastPullTimeNs >
+ StatsPullerManager::kAllPullAtomInfo.at(mTagId).coolDownNs) {
return clearCache();
} else {
return 0;
diff --git a/cmds/statsd/src/external/StatsPuller.h b/cmds/statsd/src/external/StatsPuller.h
index f8ecb87..c83c4f8 100644
--- a/cmds/statsd/src/external/StatsPuller.h
+++ b/cmds/statsd/src/external/StatsPuller.h
@@ -18,7 +18,6 @@
#include <android/os/IStatsCompanionService.h>
#include <utils/RefBase.h>
-#include <utils/String16.h>
#include <mutex>
#include <vector>
#include "packages/UidMap.h"
@@ -37,10 +36,16 @@
virtual ~StatsPuller() {}
- // Pulls the data. The returned data will have elapsedTimeNs set as timeNs
- // and will have wallClockTimeNs set as current wall clock time.
- // Return true if the pull is successful.
- bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data);
+ // Pulls the most recent data.
+ // The data may be served from cache if consecutive pulls come within
+ // predefined cooldown time.
+ // Returns true if the pull was successful.
+ // Returns false when
+ // 1) the pull fails
+ // 2) pull takes longer than mPullTimeoutNs (intrinsic to puller)
+ // If a metric wants to make any change to the data, like timestamps, it
+ // should make a copy as this data may be shared with multiple metrics.
+ bool Pull(std::vector<std::shared_ptr<LogEvent>>* data);
// Clear cache immediately
int ForceClearCache();
@@ -53,29 +58,30 @@
virtual void SetStatsCompanionService(sp<IStatsCompanionService> statsCompanionService){};
protected:
- // The atom tag id this puller pulls
const int mTagId;
private:
mutable std::mutex mLock;
- // Minimum time before this puller does actual pull again.
- // If a pull request comes before cooldown, a cached version from purevious pull
- // will be returned.
- // The actual value should be determined by individual pullers.
- int64_t mCoolDownNs;
- // For puller stats
- int64_t mMinPullIntervalNs = LONG_MAX;
+ // Real puller impl.
virtual bool PullInternal(std::vector<std::shared_ptr<LogEvent>>* data) = 0;
- // Cache of data from last pull. If next request comes before cool down finishes,
- // cached data will be returned.
- std::vector<std::shared_ptr<LogEvent>> mCachedData;
+ bool mHasGoodData = false;
int64_t mLastPullTimeNs;
+ // Cache of data from last pull. If next request comes before cool down finishes,
+ // cached data will be returned.
+ // Cached data is cleared when
+ // 1) A pull fails
+ // 2) A new pull request comes after cooldown time.
+ // 3) clearCache is called.
+ std::vector<std::shared_ptr<LogEvent>> mCachedData;
+
int clearCache();
+ int clearCacheLocked();
+
static sp<UidMap> mUidMap;
};
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c070ca3..4a716cf 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -53,195 +53,172 @@
const std::map<int, PullAtomInfo> StatsPullerManager::kAllPullAtomInfo = {
// wifi_bytes_transfer
{android::util::WIFI_BYTES_TRANSFER,
- {{2, 3, 4, 5},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER)}},
+ {.additiveFields = {2, 3, 4, 5},
+ .puller = new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER)}},
// wifi_bytes_transfer_by_fg_bg
{android::util::WIFI_BYTES_TRANSFER_BY_FG_BG,
- {{3, 4, 5, 6},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER_BY_FG_BG)}},
+ {.additiveFields = {3, 4, 5, 6},
+ .puller = new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER_BY_FG_BG)}},
// mobile_bytes_transfer
{android::util::MOBILE_BYTES_TRANSFER,
- {{2, 3, 4, 5},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER)}},
+ {.additiveFields = {2, 3, 4, 5},
+ .puller = new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER)}},
// mobile_bytes_transfer_by_fg_bg
{android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG,
- {{3, 4, 5, 6},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG)}},
+ {.additiveFields = {3, 4, 5, 6},
+ .puller =
+ new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG)}},
// bluetooth_bytes_transfer
{android::util::BLUETOOTH_BYTES_TRANSFER,
- {{2, 3},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BLUETOOTH_BYTES_TRANSFER)}},
+ {.additiveFields = {2, 3},
+ .puller = new StatsCompanionServicePuller(android::util::BLUETOOTH_BYTES_TRANSFER)}},
// kernel_wakelock
{android::util::KERNEL_WAKELOCK,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::KERNEL_WAKELOCK)}},
+ {.puller = new StatsCompanionServicePuller(android::util::KERNEL_WAKELOCK)}},
// subsystem_sleep_state
- {android::util::SUBSYSTEM_SLEEP_STATE,
- {{}, 1 * NS_PER_SEC, new SubsystemSleepStatePuller()}},
+ {android::util::SUBSYSTEM_SLEEP_STATE, {.puller = new SubsystemSleepStatePuller()}},
// on_device_power_measurement
- {android::util::ON_DEVICE_POWER_MEASUREMENT, {{}, 1 * NS_PER_SEC, new PowerStatsPuller()}},
+ {android::util::ON_DEVICE_POWER_MEASUREMENT, {.puller = new PowerStatsPuller()}},
// cpu_time_per_freq
{android::util::CPU_TIME_PER_FREQ,
- {{3}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CPU_TIME_PER_FREQ)}},
+ {.additiveFields = {3},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_FREQ)}},
// cpu_time_per_uid
{android::util::CPU_TIME_PER_UID,
- {{2, 3},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID)}},
+ {.additiveFields = {2, 3},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID)}},
// cpu_time_per_uid_freq
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_TIME_PER_UID_FREQ,
- {{4},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID_FREQ)}},
+ {.additiveFields = {4},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID_FREQ)}},
// cpu_active_time
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_ACTIVE_TIME,
- {{2}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CPU_ACTIVE_TIME)}},
+ {.additiveFields = {2},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_ACTIVE_TIME)}},
// cpu_cluster_time
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_CLUSTER_TIME,
- {{3}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CPU_CLUSTER_TIME)}},
+ {.additiveFields = {3},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_CLUSTER_TIME)}},
// wifi_activity_energy_info
{android::util::WIFI_ACTIVITY_INFO,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::WIFI_ACTIVITY_INFO)}},
+ {.puller = new StatsCompanionServicePuller(android::util::WIFI_ACTIVITY_INFO)}},
// modem_activity_info
{android::util::MODEM_ACTIVITY_INFO,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::MODEM_ACTIVITY_INFO)}},
+ {.puller = new StatsCompanionServicePuller(android::util::MODEM_ACTIVITY_INFO)}},
// bluetooth_activity_info
{android::util::BLUETOOTH_ACTIVITY_INFO,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BLUETOOTH_ACTIVITY_INFO)}},
+ {.puller = new StatsCompanionServicePuller(android::util::BLUETOOTH_ACTIVITY_INFO)}},
// system_elapsed_realtime
{android::util::SYSTEM_ELAPSED_REALTIME,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::SYSTEM_ELAPSED_REALTIME)}},
+ {.pullTimeoutNs = NS_PER_SEC / 2,
+ .coolDownNs = NS_PER_SEC,
+ .puller = new StatsCompanionServicePuller(android::util::SYSTEM_ELAPSED_REALTIME)}},
// system_uptime
{android::util::SYSTEM_UPTIME,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::SYSTEM_UPTIME)}},
+ {.puller = new StatsCompanionServicePuller(android::util::SYSTEM_UPTIME)}},
// remaining_battery_capacity
{android::util::REMAINING_BATTERY_CAPACITY,
- {{},
- 1 * NS_PER_SEC,
- new ResourceHealthManagerPuller(android::util::REMAINING_BATTERY_CAPACITY)}},
+ {.puller = new ResourceHealthManagerPuller(android::util::REMAINING_BATTERY_CAPACITY)}},
// full_battery_capacity
{android::util::FULL_BATTERY_CAPACITY,
- {{},
- 1 * NS_PER_SEC,
- new ResourceHealthManagerPuller(android::util::FULL_BATTERY_CAPACITY)}},
+ {.puller = new ResourceHealthManagerPuller(android::util::FULL_BATTERY_CAPACITY)}},
// battery_voltage
{android::util::BATTERY_VOLTAGE,
- {{}, 1 * NS_PER_SEC, new ResourceHealthManagerPuller(android::util::BATTERY_VOLTAGE)}},
- // battery_level
+ {.puller = new ResourceHealthManagerPuller(android::util::BATTERY_VOLTAGE)}},
+ // battery_voltage
{android::util::BATTERY_LEVEL,
- {{}, 1 * NS_PER_SEC, new ResourceHealthManagerPuller(android::util::BATTERY_LEVEL)}},
+ {.puller = new ResourceHealthManagerPuller(android::util::BATTERY_LEVEL)}},
// process_memory_state
{android::util::PROCESS_MEMORY_STATE,
- {{4, 5, 6, 7, 8, 9},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_STATE)}},
+ {.additiveFields = {4, 5, 6, 7, 8, 9},
+ .puller = new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_STATE)}},
// native_process_memory_state
{android::util::NATIVE_PROCESS_MEMORY_STATE,
- {{3, 4, 5, 6},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::NATIVE_PROCESS_MEMORY_STATE)}},
+ {.additiveFields = {3, 4, 5, 6},
+ .puller = new StatsCompanionServicePuller(android::util::NATIVE_PROCESS_MEMORY_STATE)}},
{android::util::PROCESS_MEMORY_HIGH_WATER_MARK,
- {{3},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_HIGH_WATER_MARK)}},
+ {.additiveFields = {3},
+ .puller =
+ new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_HIGH_WATER_MARK)}},
// temperature
- {android::util::TEMPERATURE, {{}, 1 * NS_PER_SEC, new ResourceThermalManagerPuller()}},
+ {android::util::TEMPERATURE, {.puller = new ResourceThermalManagerPuller()}},
// binder_calls
{android::util::BINDER_CALLS,
- {{4, 5, 6, 8, 12},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BINDER_CALLS)}},
+ {.additiveFields = {4, 5, 6, 8, 12},
+ .puller = new StatsCompanionServicePuller(android::util::BINDER_CALLS)}},
// binder_calls_exceptions
{android::util::BINDER_CALLS_EXCEPTIONS,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::BINDER_CALLS_EXCEPTIONS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::BINDER_CALLS_EXCEPTIONS)}},
// looper_stats
{android::util::LOOPER_STATS,
- {{5, 6, 7, 8, 9},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::LOOPER_STATS)}},
+ {.additiveFields = {5, 6, 7, 8, 9},
+ .puller = new StatsCompanionServicePuller(android::util::LOOPER_STATS)}},
// Disk Stats
{android::util::DISK_STATS,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::DISK_STATS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::DISK_STATS)}},
// Directory usage
{android::util::DIRECTORY_USAGE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::DIRECTORY_USAGE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::DIRECTORY_USAGE)}},
// Size of app's code, data, and cache
{android::util::APP_SIZE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::APP_SIZE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::APP_SIZE)}},
// Size of specific categories of files. Eg. Music.
{android::util::CATEGORY_SIZE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::CATEGORY_SIZE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::CATEGORY_SIZE)}},
// Number of fingerprints registered to each user.
{android::util::NUM_FINGERPRINTS,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::NUM_FINGERPRINTS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::NUM_FINGERPRINTS)}},
// ProcStats.
{android::util::PROC_STATS,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::PROC_STATS)}},
+ {.puller = new StatsCompanionServicePuller(android::util::PROC_STATS)}},
// ProcStatsPkgProc.
{android::util::PROC_STATS_PKG_PROC,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::PROC_STATS_PKG_PROC)}},
+ {.puller = new StatsCompanionServicePuller(android::util::PROC_STATS_PKG_PROC)}},
// Disk I/O stats per uid.
{android::util::DISK_IO,
- {{2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
- 3 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DISK_IO)}},
+ {.additiveFields = {2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
+ .coolDownNs = 3 * NS_PER_SEC,
+ .puller = new StatsCompanionServicePuller(android::util::DISK_IO)}},
// PowerProfile constants for power model calculations.
{android::util::POWER_PROFILE,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::POWER_PROFILE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::POWER_PROFILE)}},
// Process cpu stats. Min cool-down is 5 sec, inline with what AcitivityManagerService uses.
{android::util::PROCESS_CPU_TIME,
- {{} /* additive fields */,
- 5 * NS_PER_SEC /* min cool-down in seconds*/,
- new StatsCompanionServicePuller(android::util::PROCESS_CPU_TIME)}},
+ {.coolDownNs = 5 * NS_PER_SEC /* min cool-down in seconds*/,
+ .puller = new StatsCompanionServicePuller(android::util::PROCESS_CPU_TIME)}},
{android::util::CPU_TIME_PER_THREAD_FREQ,
- {{7, 9, 11, 13, 15, 17, 19, 21},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::CPU_TIME_PER_THREAD_FREQ)}},
+ {.additiveFields = {7, 9, 11, 13, 15, 17, 19, 21},
+ .puller = new StatsCompanionServicePuller(android::util::CPU_TIME_PER_THREAD_FREQ)}},
// DeviceCalculatedPowerUse.
{android::util::DEVICE_CALCULATED_POWER_USE,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_USE)}},
+ {.puller = new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_USE)}},
// DeviceCalculatedPowerBlameUid.
{android::util::DEVICE_CALCULATED_POWER_BLAME_UID,
- {{}, // BatteryStats already merged isolated with host ids so it's unnecessary here.
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_BLAME_UID)}},
+ {.puller = new StatsCompanionServicePuller(
+ android::util::DEVICE_CALCULATED_POWER_BLAME_UID)}},
// DeviceCalculatedPowerBlameOther.
{android::util::DEVICE_CALCULATED_POWER_BLAME_OTHER,
- {{},
- 1 * NS_PER_SEC,
- new StatsCompanionServicePuller(android::util::DEVICE_CALCULATED_POWER_BLAME_OTHER)}},
+ {.puller = new StatsCompanionServicePuller(
+ android::util::DEVICE_CALCULATED_POWER_BLAME_OTHER)}},
// BuildInformation.
{android::util::BUILD_INFORMATION,
- {{}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::BUILD_INFORMATION)}},
+ {.puller = new StatsCompanionServicePuller(android::util::BUILD_INFORMATION)}},
};
StatsPullerManager::StatsPullerManager() : mNextPullTimeNs(NO_ALARM_UPDATE) {
}
-bool StatsPullerManager::Pull(const int tagId, const int64_t timeNs,
- vector<shared_ptr<LogEvent>>* data) {
+bool StatsPullerManager::Pull(int tagId, vector<shared_ptr<LogEvent>>* data) {
VLOG("Initiating pulling %d", tagId);
if (kAllPullAtomInfo.find(tagId) != kAllPullAtomInfo.end()) {
- bool ret = kAllPullAtomInfo.find(tagId)->second.puller->Pull(timeNs, data);
+ bool ret = kAllPullAtomInfo.find(tagId)->second.puller->Pull(data);
VLOG("pulled %d items", (int)data->size());
return ret;
} else {
@@ -333,8 +310,9 @@
}
}
-void StatsPullerManager::OnAlarmFired(const int64_t currentTimeNs) {
+void StatsPullerManager::OnAlarmFired(int64_t elapsedTimeNs) {
AutoMutex _l(mLock);
+ int64_t wallClockNs = getWallClockNs();
int64_t minNextPullTimeNs = NO_ALARM_UPDATE;
@@ -344,7 +322,7 @@
vector<ReceiverInfo*> receivers = vector<ReceiverInfo*>();
if (pair.second.size() != 0) {
for (ReceiverInfo& receiverInfo : pair.second) {
- if (receiverInfo.nextPullTimeNs <= currentTimeNs) {
+ if (receiverInfo.nextPullTimeNs <= elapsedTimeNs) {
receivers.push_back(&receiverInfo);
} else {
if (receiverInfo.nextPullTimeNs < minNextPullTimeNs) {
@@ -360,22 +338,38 @@
for (const auto& pullInfo : needToPull) {
vector<shared_ptr<LogEvent>> data;
- if (Pull(pullInfo.first, currentTimeNs, &data)) {
- for (const auto& receiverInfo : pullInfo.second) {
- sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
- if (receiverPtr != nullptr) {
- receiverPtr->onDataPulled(data);
- // we may have just come out of a coma, compute next pull time
- receiverInfo->nextPullTimeNs =
- (currentTimeNs - receiverInfo->nextPullTimeNs) /
- receiverInfo->intervalNs * receiverInfo->intervalNs +
- receiverInfo->intervalNs + receiverInfo->nextPullTimeNs;
- if (receiverInfo->nextPullTimeNs < minNextPullTimeNs) {
- minNextPullTimeNs = receiverInfo->nextPullTimeNs;
- }
- } else {
- VLOG("receiver already gone.");
+ if (!Pull(pullInfo.first, &data)) {
+ VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
+ continue;
+ }
+ StatsdStats::getInstance().notePullDelay(pullInfo.first,
+ getElapsedRealtimeNs() - elapsedTimeNs);
+
+ // Convention is to mark pull atom timestamp at request time.
+ // If we pull at t0, puller starts at t1, finishes at t2, and send back
+ // at t3, we mark t0 as its timestamp, which should correspond to its
+ // triggering event, such as condition change at t0.
+ // Here the triggering event is alarm fired from AlarmManager.
+ // In ValueMetricProducer and GaugeMetricProducer we do same thing
+ // when pull on condition change, etc.
+ for (auto& event : data) {
+ event->setElapsedTimestampNs(elapsedTimeNs);
+ event->setLogdWallClockTimestampNs(wallClockNs);
+ }
+
+ for (const auto& receiverInfo : pullInfo.second) {
+ sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
+ if (receiverPtr != nullptr) {
+ receiverPtr->onDataPulled(data);
+ // we may have just come out of a coma, compute next pull time
+ int numBucketsAhead =
+ (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
+ receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
+ if (receiverInfo->nextPullTimeNs < minNextPullTimeNs) {
+ minNextPullTimeNs = receiverInfo->nextPullTimeNs;
}
+ } else {
+ VLOG("receiver already gone.");
}
}
}
diff --git a/cmds/statsd/src/external/StatsPullerManager.h b/cmds/statsd/src/external/StatsPullerManager.h
index 3350736..807e4af 100644
--- a/cmds/statsd/src/external/StatsPullerManager.h
+++ b/cmds/statsd/src/external/StatsPullerManager.h
@@ -26,6 +26,7 @@
#include <vector>
#include "PullDataReceiver.h"
#include "StatsPuller.h"
+#include "guardrail/StatsdStats.h"
#include "logd/LogEvent.h"
namespace android {
@@ -36,11 +37,19 @@
// The field numbers of the fields that need to be summed when merging
// isolated uid with host uid.
std::vector<int> additiveFields;
- // How long should the puller wait before doing an actual pull again. Default
- // 1 sec. Set this to 0 if this is handled elsewhere.
+ // Minimum time before this puller does actual pull again.
+ // Pullers can cause significant impact to system health and battery.
+ // So that we don't pull too frequently.
+ // If a pull request comes before cooldown, a cached version from previous pull
+ // will be returned.
int64_t coolDownNs = 1 * NS_PER_SEC;
// The actual puller
sp<StatsPuller> puller;
+ // Max time allowed to pull this atom.
+ // We cannot reliably kill a pull thread. So we don't terminate the puller.
+ // The data is discarded if the pull takes longer than this and mHasGoodData
+ // marked as false.
+ int64_t pullTimeoutNs = StatsdStats::kPullMaxDelayNs;
} PullAtomInfo;
class StatsPullerManager : public virtual RefBase {
@@ -61,13 +70,18 @@
// Verify if we know how to pull for this matcher
bool PullerForMatcherExists(int tagId) const;
- void OnAlarmFired(const int64_t timeNs);
+ void OnAlarmFired(int64_t elapsedTimeNs);
- // Use respective puller to pull the data. The returned data will have
- // elapsedTimeNs set as timeNs and will have wallClockTimeNs set as current
- // wall clock time.
- virtual bool Pull(const int tagId, const int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data);
+ // Pulls the most recent data.
+ // The data may be served from cache if consecutive pulls come within
+ // mCoolDownNs.
+ // Returns true if the pull was successful.
+ // Returns false when
+ // 1) the pull fails
+ // 2) pull takes longer than mPullTimeoutNs (intrinsic to puller)
+ // If the metric wants to make any change to the data, like timestamps, they
+ // should make a copy as this data may be shared with multiple metrics.
+ virtual bool Pull(int tagId, vector<std::shared_ptr<LogEvent>>* data);
// Clear pull data cache immediately.
int ForceClearPullerCache();
diff --git a/cmds/statsd/src/external/SubsystemSleepStatePuller.h b/cmds/statsd/src/external/SubsystemSleepStatePuller.h
index 17ce5b4cb..87f5f02 100644
--- a/cmds/statsd/src/external/SubsystemSleepStatePuller.h
+++ b/cmds/statsd/src/external/SubsystemSleepStatePuller.h
@@ -29,6 +29,8 @@
class SubsystemSleepStatePuller : public StatsPuller {
public:
SubsystemSleepStatePuller();
+
+private:
bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override;
};
diff --git a/cmds/statsd/src/guardrail/StatsdStats.cpp b/cmds/statsd/src/guardrail/StatsdStats.cpp
index 3e5e82f..f4d0144 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.cpp
+++ b/cmds/statsd/src/guardrail/StatsdStats.cpp
@@ -373,6 +373,16 @@
mPulledAtomStats[pullAtomId].dataError++;
}
+void StatsdStats::notePullTimeout(int pullAtomId) {
+ lock_guard<std::mutex> lock(mLock);
+ mPulledAtomStats[pullAtomId].pullTimeout++;
+}
+
+void StatsdStats::notePullExceedMaxDelay(int pullAtomId) {
+ lock_guard<std::mutex> lock(mLock);
+ mPulledAtomStats[pullAtomId].pullExceedMaxDelay++;
+}
+
void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
lock_guard<std::mutex> lock(mLock);
@@ -429,6 +439,8 @@
pullStats.second.maxPullDelayNs = 0;
pullStats.second.numPullDelay = 0;
pullStats.second.dataError = 0;
+ pullStats.second.pullTimeout = 0;
+ pullStats.second.pullExceedMaxDelay = 0;
}
}
@@ -535,13 +547,16 @@
dprintf(out, "********Pulled Atom stats***********\n");
for (const auto& pair : mPulledAtomStats) {
dprintf(out,
- "Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld, (average "
- "pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay nanos)%lld, "
- "(max pull delay nanos)%lld, (data error)%ld\n",
+ "Atom %d->(total pull)%ld, (pull from cache)%ld, (min pull interval)%ld \n"
+ " (average pull time nanos)%lld, (max pull time nanos)%lld, (average pull delay "
+ "nanos)%lld, "
+ " (max pull delay nanos)%lld, (data error)%ld\n"
+ " (pull timeout)%ld, (pull exceed max delay)%ld\n",
(int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache,
(long)pair.second.minPullIntervalSec, (long long)pair.second.avgPullTimeNs,
(long long)pair.second.maxPullTimeNs, (long long)pair.second.avgPullDelayNs,
- (long long)pair.second.maxPullDelayNs, pair.second.dataError);
+ (long long)pair.second.maxPullDelayNs, pair.second.dataError,
+ pair.second.pullTimeout, pair.second.pullExceedMaxDelay);
}
if (mAnomalyAlarmRegisteredStats > 0) {
diff --git a/cmds/statsd/src/guardrail/StatsdStats.h b/cmds/statsd/src/guardrail/StatsdStats.h
index 3157037..dc647f8 100644
--- a/cmds/statsd/src/guardrail/StatsdStats.h
+++ b/cmds/statsd/src/guardrail/StatsdStats.h
@@ -144,6 +144,8 @@
// How long to try to clear puller cache from last time
static const long kPullerCacheClearIntervalSec = 1;
+ // Max time to do a pull.
+ static const int64_t kPullMaxDelayNs = 10 * NS_PER_SEC;
/**
* Report a new config has been received and report the static stats about the config.
*
@@ -296,6 +298,16 @@
void notePullDelay(int pullAtomId, int64_t pullDelayNs);
/*
+ * Records pull exceeds timeout for the puller.
+ */
+ void notePullTimeout(int pullAtomId);
+
+ /*
+ * Records pull exceeds max delay for a metric.
+ */
+ void notePullExceedMaxDelay(int pullAtomId);
+
+ /*
* Records when system server restarts.
*/
void noteSystemServerRestart(int32_t timeSec);
@@ -335,6 +347,8 @@
int64_t maxPullDelayNs = 0;
long numPullDelay = 0;
long dataError = 0;
+ long pullTimeout = 0;
+ long pullExceedMaxDelay = 0;
} PulledAtomStats;
private:
diff --git a/cmds/statsd/src/logd/LogEvent.cpp b/cmds/statsd/src/logd/LogEvent.cpp
index 8d61aba..2ff8aa1 100644
--- a/cmds/statsd/src/logd/LogEvent.cpp
+++ b/cmds/statsd/src/logd/LogEvent.cpp
@@ -41,6 +41,14 @@
}
}
+LogEvent::LogEvent(const LogEvent& event) {
+ mTagId = event.mTagId;
+ mLogUid = event.mLogUid;
+ mElapsedTimestampNs = event.mElapsedTimestampNs;
+ mLogdTimestampNs = event.mLogdTimestampNs;
+ mValues = event.mValues;
+}
+
LogEvent::LogEvent(const StatsLogEventWrapper& statsLogEventWrapper, int workChainIndex) {
mTagId = statsLogEventWrapper.getTagId();
mLogdTimestampNs = statsLogEventWrapper.getWallClockTimeNs();
diff --git a/cmds/statsd/src/logd/LogEvent.h b/cmds/statsd/src/logd/LogEvent.h
index 4e37e9b..43e6e4f 100644
--- a/cmds/statsd/src/logd/LogEvent.h
+++ b/cmds/statsd/src/logd/LogEvent.h
@@ -207,10 +207,13 @@
return &mValues;
}
+ inline LogEvent makeCopy() {
+ return LogEvent(*this);
+ }
+
private:
/**
- * Don't copy, it's slower. If we really need this we can add it but let's try to
- * avoid it.
+ * Only use this if copy is absolutely needed.
*/
LogEvent(const LogEvent&);
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index ec60244..67a1a47 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -68,15 +68,12 @@
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
-GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
- const int conditionIndex,
- const sp<ConditionWizard>& wizard,
- const int whatMatcherIndex,
- const sp<EventMatcherWizard>& matcherWizard,
- const int pullTagId,
- const int triggerAtomId, const int atomId,
- const int64_t timeBaseNs, const int64_t startTimeNs,
- const sp<StatsPullerManager>& pullerManager)
+GaugeMetricProducer::GaugeMetricProducer(
+ const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex,
+ const sp<ConditionWizard>& wizard, const int whatMatcherIndex,
+ const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId,
+ const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs,
+ const sp<StatsPullerManager>& pullerManager)
: MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
mWhatMatcherIndex(whatMatcherIndex),
mEventMatcherWizard(matcherWizard),
@@ -86,6 +83,8 @@
mAtomId(atomId),
mIsPulled(pullTagId != -1),
mMinBucketSizeNs(metric.min_bucket_size_nanos()),
+ mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+ : StatsdStats::kPullMaxDelayNs),
mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
StatsdStats::kAtomDimensionKeySizeLimitMap.end()
? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -338,14 +337,24 @@
return;
}
vector<std::shared_ptr<LogEvent>> allData;
- if (!mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
+ if (!mPullerManager->Pull(mPullTagId, &allData)) {
ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
return;
}
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ if (pullDelayNs > mMaxPullDelayNs) {
+ ALOGE("Pull finish too late for atom %d", mPullTagId);
+ StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+ return;
+ }
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(
- *data, mWhatMatcherIndex) == MatchingState::kMatched) {
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+ LogEvent localCopy = data->makeCopy();
+ localCopy.setElapsedTimestampNs(timestampNs);
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+ MatchingState::kMatched) {
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
}
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 6e3530b..a1a5061 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -160,6 +160,8 @@
GaugeMetric::SamplingType mSamplingType;
+ const int64_t mMaxPullDelayNs;
+
// apply a whitelist on the original input
std::shared_ptr<vector<FieldValue>> getGaugeFields(const LogEvent& event);
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index cf56e2d..9a8e3bd 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -103,7 +103,9 @@
mValueDirection(metric.value_direction()),
mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
mUseZeroDefaultBase(metric.use_zero_default_base()),
- mHasGlobalBase(false) {
+ mHasGlobalBase(false),
+ mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+ : StatsdStats::kPullMaxDelayNs) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -340,19 +342,32 @@
void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
vector<std::shared_ptr<LogEvent>> allData;
- if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
- for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(
- *data, mWhatMatcherIndex) == MatchingState::kMatched) {
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
- }
- }
- mHasGlobalBase = true;
- } else {
- // for pulled data, every pull is needed. So we reset the base if any
- // pull fails.
+ if (!mPullerManager->Pull(mPullTagId, &allData)) {
+ ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
resetBase();
+ return;
}
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ if (pullDelayNs > mMaxPullDelayNs) {
+ ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
+ (long long)mMaxPullDelayNs);
+ StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+ resetBase();
+ return;
+ }
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+
+ for (const auto& data : allData) {
+ // make a copy before doing and changes
+ LogEvent localCopy = data->makeCopy();
+ localCopy.setElapsedTimestampNs(timestampNs);
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+ MatchingState::kMatched) {
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
+ }
+ }
+ mHasGlobalBase = true;
}
int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -381,10 +396,11 @@
return;
}
for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) ==
+ LogEvent localCopy = data->makeCopy();
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
MatchingState::kMatched) {
- data->setElapsedTimestampNs(bucketEndTime);
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+ localCopy.setElapsedTimestampNs(bucketEndTime);
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
mHasGlobalBase = true;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 4991af4..4865aee 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -183,6 +183,8 @@
// diff against.
bool mHasGlobalBase;
+ const int64_t mMaxPullDelayNs;
+
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -207,6 +209,8 @@
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
};
} // namespace statsd
diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp
index dffff7a..22883f3 100644
--- a/cmds/statsd/src/shell/ShellSubscriber.cpp
+++ b/cmds/statsd/src/shell/ShellSubscriber.cpp
@@ -131,8 +131,7 @@
VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
vector<std::shared_ptr<LogEvent>> data;
- mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), nowMillis * 1000000L,
- &data);
+ mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
VLOG("pulled %zu atoms", data.size());
if (data.size() > 0) {
writeToOutputLocked(data, pullInfo.mPullerMatcher);
diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto
index 5a87e46..e8de875 100644
--- a/cmds/statsd/src/stats_log.proto
+++ b/cmds/statsd/src/stats_log.proto
@@ -401,6 +401,8 @@
optional int64 average_pull_delay_nanos = 7;
optional int64 max_pull_delay_nanos = 8;
optional int64 data_error = 9;
+ optional int64 pull_timeout = 10;
+ optional int64 pull_exceed_max_delay = 11;
}
repeated PulledAtomStats pulled_atom_stats = 10;
diff --git a/cmds/statsd/src/stats_log_util.cpp b/cmds/statsd/src/stats_log_util.cpp
index f1310db..7de0bb3 100644
--- a/cmds/statsd/src/stats_log_util.cpp
+++ b/cmds/statsd/src/stats_log_util.cpp
@@ -64,6 +64,8 @@
const int FIELD_ID_AVERAGE_PULL_DELAY_NANOS = 7;
const int FIELD_ID_MAX_PULL_DELAY_NANOS = 8;
const int FIELD_ID_DATA_ERROR = 9;
+const int FIELD_ID_PULL_TIMEOUT = 10;
+const int FIELD_ID_PULL_EXCEED_MAX_DELAY = 11;
namespace {
@@ -450,6 +452,10 @@
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_MAX_PULL_DELAY_NANOS,
(long long)pair.second.maxPullDelayNs);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_DATA_ERROR, (long long)pair.second.dataError);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_PULL_TIMEOUT,
+ (long long)pair.second.pullTimeout);
+ protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_PULL_EXCEED_MAX_DELAY,
+ (long long)pair.second.pullExceedMaxDelay);
protoOutput->end(token);
}
diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto
index f485185..381ac32 100644
--- a/cmds/statsd/src/statsd_config.proto
+++ b/cmds/statsd/src/statsd_config.proto
@@ -240,7 +240,10 @@
optional SamplingType sampling_type = 9 [default = RANDOM_ONE_SAMPLE] ;
optional int64 min_bucket_size_nanos = 10;
+
optional int64 max_num_gauge_atoms_per_bucket = 11 [default = 10];
+
+ optional int32 max_pull_delay_sec = 13 [default = 10];
}
message ValueMetric {
@@ -285,6 +288,8 @@
optional ValueDirection value_direction = 13 [default = INCREASING];
optional bool skip_zero_diff_output = 14 [default = true];
+
+ optional int32 max_pull_delay_sec = 16 [default = 10];
}
message Alert {
diff --git a/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp b/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp
index 2d090e0..d5c358d 100644
--- a/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp
+++ b/cmds/statsd/tests/e2e/GaugeMetric_e2e_pull_test.cpp
@@ -48,6 +48,7 @@
*gaugeMetric->mutable_dimensions_in_what() =
CreateDimensions(android::util::TEMPERATURE, {2/* sensor name field */ });
gaugeMetric->set_bucket(FIVE_MINUTES);
+ gaugeMetric->set_max_pull_delay_sec(INT_MAX);
config.set_hash_strings_in_metric_report(false);
return config;
@@ -57,7 +58,7 @@
TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvents) {
auto config = CreateStatsdConfig(GaugeMetric::RANDOM_ONE_SAMPLE);
- int64_t baseTimeNs = 10 * NS_PER_SEC;
+ int64_t baseTimeNs = getElapsedRealtimeNs();
int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
int64_t bucketSizeNs =
TimeUnitToBucketSizeInMillis(config.gauge_metric(0).bucket()) * 1000000;
@@ -202,7 +203,7 @@
TEST(GaugeMetricE2eTest, TestConditionChangeToTrueSamplePulledEvents) {
auto config = CreateStatsdConfig(GaugeMetric::CONDITION_CHANGE_TO_TRUE);
- int64_t baseTimeNs = 10 * NS_PER_SEC;
+ int64_t baseTimeNs = getElapsedRealtimeNs();
int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
int64_t bucketSizeNs =
TimeUnitToBucketSizeInMillis(config.gauge_metric(0).bucket()) * 1000000;
@@ -303,7 +304,7 @@
TEST(GaugeMetricE2eTest, TestRandomSamplePulledEvent_LateAlarm) {
auto config = CreateStatsdConfig(GaugeMetric::RANDOM_ONE_SAMPLE);
- int64_t baseTimeNs = 10 * NS_PER_SEC;
+ int64_t baseTimeNs = getElapsedRealtimeNs();
int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
int64_t bucketSizeNs =
TimeUnitToBucketSizeInMillis(config.gauge_metric(0).bucket()) * 1000000;
diff --git a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
index abf1ab1..cab6eac 100644
--- a/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
+++ b/cmds/statsd/tests/e2e/ValueMetric_pull_e2e_test.cpp
@@ -50,6 +50,7 @@
valueMetric->set_bucket(FIVE_MINUTES);
valueMetric->set_use_absolute_value_on_reset(true);
valueMetric->set_skip_zero_diff_output(false);
+ valueMetric->set_max_pull_delay_sec(INT_MAX);
return config;
}
@@ -57,7 +58,7 @@
TEST(ValueMetricE2eTest, TestPulledEvents) {
auto config = CreateStatsdConfig();
- int64_t baseTimeNs = 10 * NS_PER_SEC;
+ int64_t baseTimeNs = getElapsedRealtimeNs();
int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
int64_t bucketSizeNs =
TimeUnitToBucketSizeInMillis(config.value_metric(0).bucket()) * 1000000;
@@ -163,7 +164,7 @@
TEST(ValueMetricE2eTest, TestPulledEvents_LateAlarm) {
auto config = CreateStatsdConfig();
- int64_t baseTimeNs = 10 * NS_PER_SEC;
+ int64_t baseTimeNs = getElapsedRealtimeNs();
int64_t configAddedTimeNs = 10 * 60 * NS_PER_SEC + baseTimeNs;
int64_t bucketSizeNs =
TimeUnitToBucketSizeInMillis(config.value_metric(0).bucket()) * 1000000;
diff --git a/cmds/statsd/tests/external/StatsPuller_test.cpp b/cmds/statsd/tests/external/StatsPuller_test.cpp
new file mode 100644
index 0000000..76e2097
--- /dev/null
+++ b/cmds/statsd/tests/external/StatsPuller_test.cpp
@@ -0,0 +1,227 @@
+// Copyright (C) 2018 The Android Open Source Project
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <stdio.h>
+#include <chrono>
+#include <thread>
+#include <vector>
+#include "../metrics/metrics_test_helper.h"
+#include "src/stats_log_util.h"
+#include "tests/statsd_test_util.h"
+
+#ifdef __ANDROID__
+
+namespace android {
+namespace os {
+namespace statsd {
+
+using namespace testing;
+using std::make_shared;
+using std::shared_ptr;
+using std::vector;
+using std::this_thread::sleep_for;
+using testing::Contains;
+
+// cooldown time 1sec.
+int pullTagId = 10014;
+
+bool pullSuccess;
+vector<std::shared_ptr<LogEvent>> pullData;
+long pullDelayNs;
+
+class FakePuller : public StatsPuller {
+public:
+ FakePuller() : StatsPuller(pullTagId){};
+
+private:
+ bool PullInternal(vector<std::shared_ptr<LogEvent>>* data) override {
+ (*data) = pullData;
+ sleep_for(std::chrono::nanoseconds(pullDelayNs));
+ return pullSuccess;
+ }
+};
+
+FakePuller puller;
+
+shared_ptr<LogEvent> createSimpleEvent(int64_t eventTimeNs, int64_t value) {
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(pullTagId, eventTimeNs);
+ event->write(value);
+ event->init();
+ return event;
+}
+
+class StatsPullerTest : public ::testing::Test {
+public:
+ StatsPullerTest() {
+ }
+
+ void SetUp() override {
+ puller.ForceClearCache();
+ pullSuccess = false;
+ pullDelayNs = 0;
+ pullData.clear();
+ }
+};
+
+TEST_F(StatsPullerTest, PullSucces) {
+ pullData.push_back(createSimpleEvent(1111L, 33));
+
+ pullSuccess = true;
+
+ vector<std::shared_ptr<LogEvent>> dataHolder;
+ EXPECT_TRUE(puller.Pull(&dataHolder));
+ EXPECT_EQ(1, dataHolder.size());
+ EXPECT_EQ(pullTagId, dataHolder[0]->GetTagId());
+ EXPECT_EQ(1111L, dataHolder[0]->GetElapsedTimestampNs());
+ EXPECT_EQ(1, dataHolder[0]->size());
+ EXPECT_EQ(33, dataHolder[0]->getValues()[0].mValue.int_value);
+
+ sleep_for(std::chrono::seconds(1));
+
+ pullData.clear();
+ pullData.push_back(createSimpleEvent(2222L, 44));
+
+ pullSuccess = true;
+
+ EXPECT_TRUE(puller.Pull(&dataHolder));
+ EXPECT_EQ(1, dataHolder.size());
+ EXPECT_EQ(pullTagId, dataHolder[0]->GetTagId());
+ EXPECT_EQ(2222L, dataHolder[0]->GetElapsedTimestampNs());
+ EXPECT_EQ(1, dataHolder[0]->size());
+ EXPECT_EQ(44, dataHolder[0]->getValues()[0].mValue.int_value);
+}
+
+TEST_F(StatsPullerTest, PullFailAfterSuccess) {
+ pullData.push_back(createSimpleEvent(1111L, 33));
+
+ pullSuccess = true;
+
+ vector<std::shared_ptr<LogEvent>> dataHolder;
+ EXPECT_TRUE(puller.Pull(&dataHolder));
+ EXPECT_EQ(1, dataHolder.size());
+ EXPECT_EQ(pullTagId, dataHolder[0]->GetTagId());
+ EXPECT_EQ(1111L, dataHolder[0]->GetElapsedTimestampNs());
+ EXPECT_EQ(1, dataHolder[0]->size());
+ EXPECT_EQ(33, dataHolder[0]->getValues()[0].mValue.int_value);
+
+ sleep_for(std::chrono::seconds(1));
+
+ pullData.clear();
+ pullData.push_back(createSimpleEvent(2222L, 44));
+
+ pullSuccess = false;
+ dataHolder.clear();
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+
+ pullSuccess = true;
+ dataHolder.clear();
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+}
+
+// Test pull takes longer than timeout, 2nd pull happens shorter than cooldown
+TEST_F(StatsPullerTest, PullTakeTooLongAndPullFast) {
+ pullData.push_back(createSimpleEvent(1111L, 33));
+ pullSuccess = true;
+ // timeout is 0.5
+ pullDelayNs = (long)(0.8 * NS_PER_SEC);
+
+ vector<std::shared_ptr<LogEvent>> dataHolder;
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+
+ pullData.clear();
+ pullData.push_back(createSimpleEvent(2222L, 44));
+
+ pullSuccess = true;
+ dataHolder.clear();
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+}
+
+TEST_F(StatsPullerTest, PullFail) {
+ pullData.push_back(createSimpleEvent(1111L, 33));
+
+ pullSuccess = false;
+
+ vector<std::shared_ptr<LogEvent>> dataHolder;
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+}
+
+TEST_F(StatsPullerTest, PullTakeTooLong) {
+ pullData.push_back(createSimpleEvent(1111L, 33));
+
+ pullSuccess = true;
+ pullDelayNs = NS_PER_SEC;
+
+ vector<std::shared_ptr<LogEvent>> dataHolder;
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+}
+
+TEST_F(StatsPullerTest, PullTooFast) {
+ pullData.push_back(createSimpleEvent(1111L, 33));
+
+ pullSuccess = true;
+
+ vector<std::shared_ptr<LogEvent>> dataHolder;
+ EXPECT_TRUE(puller.Pull(&dataHolder));
+ EXPECT_EQ(1, dataHolder.size());
+ EXPECT_EQ(pullTagId, dataHolder[0]->GetTagId());
+ EXPECT_EQ(1111L, dataHolder[0]->GetElapsedTimestampNs());
+ EXPECT_EQ(1, dataHolder[0]->size());
+ EXPECT_EQ(33, dataHolder[0]->getValues()[0].mValue.int_value);
+
+ pullData.clear();
+ pullData.push_back(createSimpleEvent(2222L, 44));
+
+ pullSuccess = true;
+
+ dataHolder.clear();
+ EXPECT_TRUE(puller.Pull(&dataHolder));
+ EXPECT_EQ(1, dataHolder.size());
+ EXPECT_EQ(pullTagId, dataHolder[0]->GetTagId());
+ EXPECT_EQ(1111L, dataHolder[0]->GetElapsedTimestampNs());
+ EXPECT_EQ(1, dataHolder[0]->size());
+ EXPECT_EQ(33, dataHolder[0]->getValues()[0].mValue.int_value);
+}
+
+TEST_F(StatsPullerTest, PullFailsAndTooFast) {
+ pullData.push_back(createSimpleEvent(1111L, 33));
+
+ pullSuccess = false;
+
+ vector<std::shared_ptr<LogEvent>> dataHolder;
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+
+ pullData.clear();
+ pullData.push_back(createSimpleEvent(2222L, 44));
+
+ pullSuccess = true;
+
+ EXPECT_FALSE(puller.Pull(&dataHolder));
+ EXPECT_EQ(0, dataHolder.size());
+}
+
+} // namespace statsd
+} // namespace os
+} // namespace android
+#else
+GTEST_LOG_(INFO) << "This test does nothing.\n";
+#endif
diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
index 67a9f7f..2799107 100644
--- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
@@ -90,6 +90,7 @@
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
metric.mutable_gauge_fields_filter()->set_include_all(false);
+ metric.set_max_pull_delay_sec(INT_MAX);
auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
gaugeFieldMatcher->set_field(tagId);
gaugeFieldMatcher->add_child()->set_field(1);
@@ -106,9 +107,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write(3);
@@ -266,6 +266,7 @@
GaugeMetric metric;
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
+ metric.set_max_pull_delay_sec(INT_MAX);
auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
gaugeFieldMatcher->set_field(tagId);
gaugeFieldMatcher->add_child()->set_field(2);
@@ -281,10 +282,9 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
.WillOnce(Return(false))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, eventUpgradeTimeNs);
event->write("some value");
@@ -341,6 +341,7 @@
GaugeMetric metric;
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
+ metric.set_max_pull_delay_sec(INT_MAX);
auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
gaugeFieldMatcher->set_field(tagId);
gaugeFieldMatcher->add_child()->set_field(2);
@@ -357,9 +358,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write("some value");
@@ -420,6 +420,7 @@
metric.set_bucket(ONE_MINUTE);
metric.mutable_gauge_fields_filter()->set_include_all(true);
metric.set_condition(StringToId("APP_DIED"));
+ metric.set_max_pull_delay_sec(INT_MAX);
auto dim = metric.mutable_dimensions_in_what();
dim->set_field(tagId);
dim->add_child()->set_field(1);
@@ -454,9 +455,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write(1000);
@@ -502,11 +502,12 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _)).WillOnce(Return(false));
GaugeMetric metric;
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
+ metric.set_max_pull_delay_sec(INT_MAX);
auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
gaugeFieldMatcher->set_field(tagId);
gaugeFieldMatcher->add_child()->set_field(2);
@@ -591,6 +592,7 @@
metric.set_bucket(ONE_MINUTE);
metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
metric.mutable_gauge_fields_filter()->set_include_all(false);
+ metric.set_max_pull_delay_sec(INT_MAX);
auto gaugeFieldMatcher = metric.mutable_gauge_fields_filter()->mutable_fields();
gaugeFieldMatcher->set_field(tagId);
gaugeFieldMatcher->add_child()->set_field(1);
@@ -604,9 +606,8 @@
new SimpleLogMatchingTracker(atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write(4);
@@ -614,8 +615,7 @@
data->push_back(event);
return true;
}))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
event->write(5);
@@ -664,6 +664,7 @@
metric.set_bucket(ONE_MINUTE);
metric.set_sampling_type(GaugeMetric::FIRST_N_SAMPLES);
metric.mutable_gauge_fields_filter()->set_include_all(true);
+ metric.set_max_pull_delay_sec(INT_MAX);
auto dimensionMatcher = metric.mutable_dimensions_in_what();
// use field 1 as dimension.
dimensionMatcher->set_field(tagId);
@@ -678,9 +679,8 @@
new SimpleLogMatchingTracker(atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 3);
event->write(3);
@@ -689,8 +689,7 @@
data->push_back(event);
return true;
}))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write(4);
@@ -699,8 +698,7 @@
data->push_back(event);
return true;
}))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 20);
event->write(4);
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index 5524503..67570fc 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -125,6 +125,7 @@
metric.set_bucket(ONE_MINUTE);
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -136,9 +137,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
event->write(tagId);
@@ -218,6 +218,7 @@
metric.set_bucket(ONE_MINUTE);
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -232,9 +233,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
event->write(3);
@@ -315,6 +315,7 @@
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
metric.set_use_absolute_value_on_reset(true);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -326,7 +327,7 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _)).WillOnce(Return(true));
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
logEventMatcherIndex, eventMatcherWizard, tagId,
@@ -393,6 +394,7 @@
metric.set_bucket(ONE_MINUTE);
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -404,7 +406,7 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(false));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _)).WillOnce(Return(false));
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
logEventMatcherIndex, eventMatcherWizard, tagId,
@@ -469,6 +471,7 @@
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -481,9 +484,8 @@
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
@@ -492,8 +494,7 @@
data->push_back(event);
return true;
}))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
@@ -599,6 +600,7 @@
metric.set_bucket(ONE_MINUTE);
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -610,10 +612,9 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
.WillOnce(Return(true))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 149);
event->write(tagId);
@@ -661,6 +662,7 @@
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -672,9 +674,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 1);
event->write(tagId);
@@ -683,8 +684,7 @@
data->push_back(event);
return true;
}))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs - 100);
event->write(tagId);
@@ -924,6 +924,7 @@
metric.set_bucket(ONE_MINUTE);
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -935,7 +936,7 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _)).WillOnce(Return(true));
+ EXPECT_CALL(*pullerManager, Pull(tagId, _)).WillOnce(Return(true));
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
logEventMatcherIndex, eventMatcherWizard, tagId,
@@ -1012,6 +1013,7 @@
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -1024,10 +1026,9 @@
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
// condition becomes true
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
@@ -1037,8 +1038,7 @@
return true;
}))
// condition becomes false
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
@@ -1098,6 +1098,7 @@
metric.mutable_value_field()->set_field(tagId);
metric.mutable_value_field()->add_child()->set_field(2);
metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -1110,10 +1111,9 @@
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
// condition becomes true
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
event->write(tagId);
@@ -1123,8 +1123,7 @@
return true;
}))
// condition becomes false
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
event->write(tagId);
@@ -1134,8 +1133,7 @@
return true;
}))
// condition becomes true again
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 25);
event->write(tagId);
@@ -1480,6 +1478,7 @@
metric.mutable_dimensions_in_what()->set_field(tagId);
metric.mutable_dimensions_in_what()->add_child()->set_field(1);
metric.set_use_zero_default_base(true);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -1491,9 +1490,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
event->write(1);
@@ -1565,6 +1563,7 @@
metric.mutable_dimensions_in_what()->set_field(tagId);
metric.mutable_dimensions_in_what()->add_child()->set_field(1);
metric.set_use_zero_default_base(true);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -1576,9 +1575,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
event->write(1);
@@ -1692,6 +1690,7 @@
metric.mutable_value_field()->add_child()->set_field(2);
metric.mutable_dimensions_in_what()->set_field(tagId);
metric.mutable_dimensions_in_what()->add_child()->set_field(1);
+ metric.set_max_pull_delay_sec(INT_MAX);
UidMap uidMap;
SimpleAtomMatcher atomMatcher;
@@ -1703,9 +1702,8 @@
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
- EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
- .WillOnce(Invoke([](int tagId, int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data) {
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs);
event->write(1);
@@ -1804,6 +1802,128 @@
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
}
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(100);
+ event->init();
+ data->push_back(event);
+ return true;
+ }))
+ .WillOnce(Return(false));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+
+ // has one slice
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+
+ valueProducer.onConditionChanged(false, bucketStartTimeNs + 20);
+
+ // has one slice
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(0);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event->write(tagId);
+ event->write(120);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.mCondition = true;
+ valueProducer.mHasGlobalBase = true;
+
+ vector<shared_ptr<LogEvent>> allData;
+ allData.clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
+ event->write(1);
+ event->write(110);
+ event->init();
+ allData.push_back(event);
+ valueProducer.onDataPulled(allData);
+
+ // has one slice
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(110, curInterval.base.long_value);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(0UL, valueProducer.mPastBuckets.size());
+ EXPECT_EQ(true, valueProducer.mHasGlobalBase);
+
+ valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
+
+ // has one slice
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
} // namespace statsd
} // namespace os
} // namespace android
diff --git a/cmds/statsd/tests/metrics/metrics_test_helper.h b/cmds/statsd/tests/metrics/metrics_test_helper.h
index 5afaba6..97c1072 100644
--- a/cmds/statsd/tests/metrics/metrics_test_helper.h
+++ b/cmds/statsd/tests/metrics/metrics_test_helper.h
@@ -38,8 +38,7 @@
MOCK_METHOD4(RegisterReceiver, void(int tagId, wp<PullDataReceiver> receiver,
int64_t nextPulltimeNs, int64_t intervalNs));
MOCK_METHOD2(UnRegisterReceiver, void(int tagId, wp<PullDataReceiver> receiver));
- MOCK_METHOD3(Pull, bool(const int pullCode, const int64_t timeNs,
- vector<std::shared_ptr<LogEvent>>* data));
+ MOCK_METHOD2(Pull, bool(const int pullCode, vector<std::shared_ptr<LogEvent>>* data));
};
class MockUidMap : public UidMap {
diff --git a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
index dd00561..a184f56 100644
--- a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
+++ b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp
@@ -189,23 +189,22 @@
sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
- EXPECT_CALL(*pullerManager, Pull(10016, _, _))
- .WillRepeatedly(
- Invoke([](int tagId, int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data) {
- data->clear();
- shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, timeNs);
- event->write(kUid1);
- event->write(kCpuTime1);
- event->init();
- data->push_back(event);
- // another event
- event = make_shared<LogEvent>(tagId, timeNs);
- event->write(kUid2);
- event->write(kCpuTime2);
- event->init();
- data->push_back(event);
- return true;
- }));
+ EXPECT_CALL(*pullerManager, Pull(10016, _))
+ .WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, 1111L);
+ event->write(kUid1);
+ event->write(kCpuTime1);
+ event->init();
+ data->push_back(event);
+ // another event
+ event = make_shared<LogEvent>(tagId, 1111L);
+ event->write(kUid2);
+ event->write(kCpuTime2);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(),
getExpectedShellData());