Puller refactor
1) Refactor pullers and add tests.
2) Add timeout to a puller.
mPullTimeoutNs is intrinsic to puller. A pull taking longer than this is
deemed failed and the data discarded.
A metric or StatsPullerManager requesting a pull should monitor the pull
and have deadlineNs. A successful pull may come later than desired due
to statsd processing delays.
3) Add unit tests to puller now that the base puller is more
complicated.
Bug: 118756964
Test: unit test
Change-Id: I0e5d47e2527391f7beef4b2d06bfd5c2f82f1179
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index ec60244..67a1a47 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -68,15 +68,12 @@
const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 7;
const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 8;
-GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
- const int conditionIndex,
- const sp<ConditionWizard>& wizard,
- const int whatMatcherIndex,
- const sp<EventMatcherWizard>& matcherWizard,
- const int pullTagId,
- const int triggerAtomId, const int atomId,
- const int64_t timeBaseNs, const int64_t startTimeNs,
- const sp<StatsPullerManager>& pullerManager)
+GaugeMetricProducer::GaugeMetricProducer(
+ const ConfigKey& key, const GaugeMetric& metric, const int conditionIndex,
+ const sp<ConditionWizard>& wizard, const int whatMatcherIndex,
+ const sp<EventMatcherWizard>& matcherWizard, const int pullTagId, const int triggerAtomId,
+ const int atomId, const int64_t timeBaseNs, const int64_t startTimeNs,
+ const sp<StatsPullerManager>& pullerManager)
: MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
mWhatMatcherIndex(whatMatcherIndex),
mEventMatcherWizard(matcherWizard),
@@ -86,6 +83,8 @@
mAtomId(atomId),
mIsPulled(pullTagId != -1),
mMinBucketSizeNs(metric.min_bucket_size_nanos()),
+ mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+ : StatsdStats::kPullMaxDelayNs),
mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
StatsdStats::kAtomDimensionKeySizeLimitMap.end()
? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -338,14 +337,24 @@
return;
}
vector<std::shared_ptr<LogEvent>> allData;
- if (!mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
+ if (!mPullerManager->Pull(mPullTagId, &allData)) {
ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
return;
}
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ if (pullDelayNs > mMaxPullDelayNs) {
+ ALOGE("Pull finish too late for atom %d", mPullTagId);
+ StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+ return;
+ }
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(
- *data, mWhatMatcherIndex) == MatchingState::kMatched) {
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+ LogEvent localCopy = data->makeCopy();
+ localCopy.setElapsedTimestampNs(timestampNs);
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+ MatchingState::kMatched) {
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
}
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 6e3530b..a1a5061 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -160,6 +160,8 @@
GaugeMetric::SamplingType mSamplingType;
+ const int64_t mMaxPullDelayNs;
+
// apply a whitelist on the original input
std::shared_ptr<vector<FieldValue>> getGaugeFields(const LogEvent& event);
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index cf56e2d..9a8e3bd 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -103,7 +103,9 @@
mValueDirection(metric.value_direction()),
mSkipZeroDiffOutput(metric.skip_zero_diff_output()),
mUseZeroDefaultBase(metric.use_zero_default_base()),
- mHasGlobalBase(false) {
+ mHasGlobalBase(false),
+ mMaxPullDelayNs(metric.max_pull_delay_sec() > 0 ? metric.max_pull_delay_sec() * NS_PER_SEC
+ : StatsdStats::kPullMaxDelayNs) {
int64_t bucketSizeMills = 0;
if (metric.has_bucket()) {
bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -340,19 +342,32 @@
void ValueMetricProducer::pullAndMatchEventsLocked(const int64_t timestampNs) {
vector<std::shared_ptr<LogEvent>> allData;
- if (mPullerManager->Pull(mPullTagId, timestampNs, &allData)) {
- for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(
- *data, mWhatMatcherIndex) == MatchingState::kMatched) {
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
- }
- }
- mHasGlobalBase = true;
- } else {
- // for pulled data, every pull is needed. So we reset the base if any
- // pull fails.
+ if (!mPullerManager->Pull(mPullTagId, &allData)) {
+ ALOGE("Gauge Stats puller failed for tag: %d at %lld", mPullTagId, (long long)timestampNs);
resetBase();
+ return;
}
+ const int64_t pullDelayNs = getElapsedRealtimeNs() - timestampNs;
+ if (pullDelayNs > mMaxPullDelayNs) {
+ ALOGE("Pull finish too late for atom %d, longer than %lld", mPullTagId,
+ (long long)mMaxPullDelayNs);
+ StatsdStats::getInstance().notePullExceedMaxDelay(mPullTagId);
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+ resetBase();
+ return;
+ }
+ StatsdStats::getInstance().notePullDelay(mPullTagId, pullDelayNs);
+
+ for (const auto& data : allData) {
+ // make a copy before doing and changes
+ LogEvent localCopy = data->makeCopy();
+ localCopy.setElapsedTimestampNs(timestampNs);
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
+ MatchingState::kMatched) {
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
+ }
+ }
+ mHasGlobalBase = true;
}
int64_t ValueMetricProducer::calcPreviousBucketEndTime(const int64_t currentTimeNs) {
@@ -381,10 +396,11 @@
return;
}
for (const auto& data : allData) {
- if (mEventMatcherWizard->matchLogEvent(*data, mWhatMatcherIndex) ==
+ LogEvent localCopy = data->makeCopy();
+ if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
MatchingState::kMatched) {
- data->setElapsedTimestampNs(bucketEndTime);
- onMatchedLogEventLocked(mWhatMatcherIndex, *data);
+ localCopy.setElapsedTimestampNs(bucketEndTime);
+ onMatchedLogEventLocked(mWhatMatcherIndex, localCopy);
}
}
mHasGlobalBase = true;
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index 4991af4..4865aee 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -183,6 +183,8 @@
// diff against.
bool mHasGlobalBase;
+ const int64_t mMaxPullDelayNs;
+
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsNoCondition);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsWithFiltering);
FRIEND_TEST(ValueMetricProducerTest, TestPulledEventsTakeAbsoluteValueOnReset);
@@ -207,6 +209,8 @@
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
};
} // namespace statsd