Reset the base when pull fails.
Without resetting the base, we will compute the wrong diff when we have
a condition change.
Move the responsability of handling failures to the PullReceivers. This
is more consistent with #onConditionChanged which does handle the
failure cases.
We also always compute nextPullTimeNs in order to only call onDataPulled
on bucket boundaries. The current code does not update nextPullTimeNs
which means a new alarm might trigger a pull and onDataPulled in the
middle of the bucket. The behavior in this case is undefined.
Bug: 123181864
Test: atest statsd_test
Change-Id: I0910b7db26a0de764436c46c8d7d11cafa52dcd9
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h
index 0d505cb..b071682 100644
--- a/cmds/statsd/src/external/PullDataReceiver.h
+++ b/cmds/statsd/src/external/PullDataReceiver.h
@@ -28,9 +28,15 @@
class PullDataReceiver : virtual public RefBase{
public:
virtual ~PullDataReceiver() {}
- virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0;
+ /**
+ * @param data The pulled data.
+ * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
+ * bucket should be invalidated.
+ */
+ virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) = 0;
};
} // namespace statsd
} // namespace os
-} // namespace android
\ No newline at end of file
+} // namespace android
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c69384c..9b603d6 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -358,12 +358,13 @@
for (const auto& pullInfo : needToPull) {
vector<shared_ptr<LogEvent>> data;
- if (!Pull(pullInfo.first, &data)) {
+ bool pullSuccess = Pull(pullInfo.first, &data);
+ if (pullSuccess) {
+ StatsdStats::getInstance().notePullDelay(
+ pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs);
+ } else {
VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
- continue;
}
- StatsdStats::getInstance().notePullDelay(pullInfo.first,
- getElapsedRealtimeNs() - elapsedTimeNs);
// Convention is to mark pull atom timestamp at request time.
// If we pull at t0, puller starts at t1, finishes at t2, and send back
@@ -380,8 +381,8 @@
for (const auto& receiverInfo : pullInfo.second) {
sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
if (receiverPtr != nullptr) {
- receiverPtr->onDataPulled(data);
- // we may have just come out of a coma, compute next pull time
+ receiverPtr->onDataPulled(data, pullSuccess);
+ // We may have just come out of a coma, compute next pull time.
int numBucketsAhead =
(elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index c2878f0..c9b7165 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -406,9 +406,10 @@
return gaugeFields;
}
-void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ bool pullSuccess) {
std::lock_guard<std::mutex> lock(mMutex);
- if (allData.size() == 0) {
+ if (!pullSuccess || allData.size() == 0) {
return;
}
for (const auto& data : allData) {
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index df08779..d480941 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -67,7 +67,8 @@
virtual ~GaugeMetricProducer();
// Handles when the pulled data arrives.
- void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+ void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) override;
// GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 6aa8e84..9fe07e1 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -382,9 +382,16 @@
return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
}
-void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+ bool pullSuccess) {
std::lock_guard<std::mutex> lock(mMutex);
if (mCondition) {
+ if (!pullSuccess) {
+ // If the pull failed, we won't be able to compute a diff.
+ resetBase();
+ return;
+ }
+
if (allData.size() == 0) {
VLOG("Data pulled is empty");
StatsdStats::getInstance().noteEmptyData(mPullTagId);
@@ -399,12 +406,13 @@
// if the diff base will be cleared and this new data will serve as new diff base.
int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
- if (bucketEndTime < mCurrentBucketStartTimeNs) {
+ bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
+ if (isEventLate) {
VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
(long long)mCurrentBucketStartTimeNs);
StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
- return;
}
+
for (const auto& data : allData) {
LogEvent localCopy = data->makeCopy();
if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a8dfc5b..cab50aa 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -51,7 +51,8 @@
virtual ~ValueMetricProducer();
// Process data pulled on bucket boundary.
- void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+ void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+ bool pullSuccess) override;
// ValueMetric needs special logic if it's a pulled atom.
void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
@@ -216,7 +217,9 @@
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
- FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
+ FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
};
diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
index 0ffbb54..1725160 100644
--- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
@@ -133,7 +133,7 @@
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
EXPECT_EQ(INT, it->mValue.getType());
@@ -151,7 +151,7 @@
event2->write(25);
event2->init();
allData.push_back(event2);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
EXPECT_EQ(INT, it->mValue.getType());
@@ -305,7 +305,7 @@
event->write(1);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -328,7 +328,7 @@
event->write(3);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -371,7 +371,7 @@
event->write(1);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -440,7 +440,7 @@
event->write(110);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -541,7 +541,7 @@
event->write(110);
event->init();
allData.push_back(event);
- gaugeProducer.onDataPulled(allData);
+ gaugeProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
@@ -590,7 +590,7 @@
event1->write(13);
event1->init();
- gaugeProducer.onDataPulled({event1});
+ gaugeProducer.onDataPulled({event1}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -604,7 +604,7 @@
event2->write(15);
event2->init();
- gaugeProducer.onDataPulled({event2});
+ gaugeProducer.onDataPulled({event2}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -619,7 +619,7 @@
event3->write(26);
event3->init();
- gaugeProducer.onDataPulled({event3});
+ gaugeProducer.onDataPulled({event3}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin()
->second.front()
@@ -633,7 +633,7 @@
std::make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10);
event4->write("some value");
event4->init();
- gaugeProducer.onDataPulled({event4});
+ gaugeProducer.onDataPulled({event4}, /** succeed */ true);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty());
}
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index c0648ee..df0ae38 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -160,7 +160,7 @@
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -177,7 +177,7 @@
event->write(23);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -196,7 +196,7 @@
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -256,7 +256,7 @@
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval =
@@ -274,7 +274,7 @@
event->write(23);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -292,7 +292,7 @@
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -341,7 +341,7 @@
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -357,7 +357,7 @@
event->write(10);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -373,7 +373,7 @@
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasBase);
@@ -420,7 +420,7 @@
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -436,7 +436,7 @@
event->write(10);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -451,7 +451,7 @@
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasBase);
@@ -525,7 +525,7 @@
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
@@ -635,7 +635,7 @@
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -650,7 +650,7 @@
event->write(150);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
EXPECT_EQ(20L,
@@ -689,7 +689,7 @@
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -993,7 +993,7 @@
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1011,7 +1011,7 @@
event->write(23);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1032,7 +1032,7 @@
event->write(36);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
// startUpdated:false sum:12
@@ -1120,7 +1120,7 @@
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(false, curInterval.hasBase);
@@ -1224,7 +1224,7 @@
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
EXPECT_EQ(true, curInterval.hasBase);
@@ -1677,7 +1677,7 @@
allData.push_back(event1);
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval1.hasBase);
EXPECT_EQ(11, interval1.base.long_value);
@@ -1762,7 +1762,7 @@
allData.push_back(event1);
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval1.hasBase);
EXPECT_EQ(11, interval1.base.long_value);
@@ -1791,7 +1791,7 @@
event1->write(5);
event1->init();
allData.push_back(event1);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval2.hasBase);
@@ -1813,7 +1813,7 @@
event2->write(5);
event2->init();
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval2.hasBase);
@@ -1888,7 +1888,7 @@
allData.push_back(event1);
allData.push_back(event2);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval1.hasBase);
EXPECT_EQ(11, interval1.base.long_value);
@@ -1918,7 +1918,7 @@
event1->write(5);
event1->init();
allData.push_back(event1);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
@@ -1941,7 +1941,7 @@
event1->write(13);
event1->init();
allData.push_back(event1);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
EXPECT_EQ(true, interval2.hasBase);
@@ -1951,7 +1951,60 @@
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
}
-TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) {
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(INT_MAX);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ // Used by onConditionChanged.
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(100);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+ // has one slice
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+ EXPECT_EQ(true, curInterval.hasBase);
+ EXPECT_EQ(100, curInterval.base.long_value);
+ EXPECT_EQ(false, curInterval.hasValue);
+
+ vector<shared_ptr<LogEvent>> allData;
+ valueProducer.onDataPulled(allData, /** succeed */ false);
+ EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+ EXPECT_EQ(false, curInterval.hasBase);
+ EXPECT_EQ(false, curInterval.hasValue);
+ EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange) {
ValueMetric metric;
metric.set_id(metricId);
metric.set_bucket(ONE_MINUTE);
@@ -2007,6 +2060,53 @@
EXPECT_EQ(false, valueProducer.mHasGlobalBase);
}
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange) {
+ ValueMetric metric;
+ metric.set_id(metricId);
+ metric.set_bucket(ONE_MINUTE);
+ metric.mutable_value_field()->set_field(tagId);
+ metric.mutable_value_field()->add_child()->set_field(2);
+ metric.set_condition(StringToId("SCREEN_ON"));
+ metric.set_max_pull_delay_sec(0);
+
+ UidMap uidMap;
+ SimpleAtomMatcher atomMatcher;
+ atomMatcher.set_atom_id(tagId);
+ sp<EventMatcherWizard> eventMatcherWizard =
+ new EventMatcherWizard({new SimpleLogMatchingTracker(
+ atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+ sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+ sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+ EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+ EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+ EXPECT_CALL(*pullerManager, Pull(tagId, _))
+ .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+ data->clear();
+ shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+ event->write(tagId);
+ event->write(100);
+ event->init();
+ data->push_back(event);
+ return true;
+ }));
+
+ ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+ eventMatcherWizard, tagId, bucketStartTimeNs,
+ bucketStartTimeNs, pullerManager);
+
+ valueProducer.mCondition = true;
+
+ vector<shared_ptr<LogEvent>> allData;
+ valueProducer.onDataPulled(allData, /** succeed */ false);
+ EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+ ValueMetricProducer::Interval& curInterval =
+ valueProducer.mCurrentSlicedBucket.begin()->second[0];
+
+ valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
+ EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+}
+
TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) {
ValueMetric metric;
metric.set_id(metricId);
@@ -2052,7 +2152,7 @@
event->write(110);
event->init();
allData.push_back(event);
- valueProducer.onDataPulled(allData);
+ valueProducer.onDataPulled(allData, /** succeed */ true);
// has one slice
EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());