Reset the base when pull fails.

Without resetting the base, we will compute the wrong diff when we have
a condition change.

Move the responsability of handling failures to the PullReceivers. This
is more consistent with #onConditionChanged which does handle the
failure cases.

We also always compute nextPullTimeNs in order to only call onDataPulled
on bucket boundaries. The current code does not update nextPullTimeNs
which means a new alarm might trigger a pull and onDataPulled in the
middle of the bucket. The behavior in this case is undefined.

Bug: 123181864
Test: atest statsd_test
Change-Id: I0910b7db26a0de764436c46c8d7d11cafa52dcd9
diff --git a/cmds/statsd/src/external/PullDataReceiver.h b/cmds/statsd/src/external/PullDataReceiver.h
index 0d505cb..b071682 100644
--- a/cmds/statsd/src/external/PullDataReceiver.h
+++ b/cmds/statsd/src/external/PullDataReceiver.h
@@ -28,9 +28,15 @@
 class PullDataReceiver : virtual public RefBase{
  public:
   virtual ~PullDataReceiver() {}
-  virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) = 0;
+  /**
+   * @param data The pulled data.
+   * @param pullSuccess Whether the pull succeeded. If the pull does not succeed, the data for the
+   * bucket should be invalidated.
+   */
+  virtual void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data, 
+                            bool pullSuccess) = 0;
 };
 
 }  // namespace statsd
 }  // namespace os
-}  // namespace android
\ No newline at end of file
+}  // namespace android
diff --git a/cmds/statsd/src/external/StatsPullerManager.cpp b/cmds/statsd/src/external/StatsPullerManager.cpp
index c69384c..9b603d6 100644
--- a/cmds/statsd/src/external/StatsPullerManager.cpp
+++ b/cmds/statsd/src/external/StatsPullerManager.cpp
@@ -358,12 +358,13 @@
 
     for (const auto& pullInfo : needToPull) {
         vector<shared_ptr<LogEvent>> data;
-        if (!Pull(pullInfo.first, &data)) {
+        bool pullSuccess = Pull(pullInfo.first, &data);
+        if (pullSuccess) {
+            StatsdStats::getInstance().notePullDelay(
+                    pullInfo.first, getElapsedRealtimeNs() - elapsedTimeNs);
+        } else {
             VLOG("pull failed at %lld, will try again later", (long long)elapsedTimeNs);
-            continue;
         }
-        StatsdStats::getInstance().notePullDelay(pullInfo.first,
-                                                 getElapsedRealtimeNs() - elapsedTimeNs);
 
         // Convention is to mark pull atom timestamp at request time.
         // If we pull at t0, puller starts at t1, finishes at t2, and send back
@@ -380,8 +381,8 @@
         for (const auto& receiverInfo : pullInfo.second) {
             sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
             if (receiverPtr != nullptr) {
-                receiverPtr->onDataPulled(data);
-                // we may have just come out of a coma, compute next pull time
+                receiverPtr->onDataPulled(data, pullSuccess);
+                // We may have just come out of a coma, compute next pull time.
                 int numBucketsAhead =
                         (elapsedTimeNs - receiverInfo->nextPullTimeNs) / receiverInfo->intervalNs;
                 receiverInfo->nextPullTimeNs += (numBucketsAhead + 1) * receiverInfo->intervalNs;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index c2878f0..c9b7165 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -406,9 +406,10 @@
     return gaugeFields;
 }
 
-void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+                                       bool pullSuccess) {
     std::lock_guard<std::mutex> lock(mMutex);
-    if (allData.size() == 0) {
+    if (!pullSuccess || allData.size() == 0) {
         return;
     }
     for (const auto& data : allData) {
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index df08779..d480941 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -67,7 +67,8 @@
     virtual ~GaugeMetricProducer();
 
     // Handles when the pulled data arrives.
-    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+                      bool pullSuccess) override;
 
     // GaugeMetric needs to immediately trigger another pull when we create the partial bucket.
     void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 6aa8e84..9fe07e1 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -382,9 +382,16 @@
     return mTimeBaseNs + ((currentTimeNs - mTimeBaseNs) / mBucketSizeNs) * mBucketSizeNs;
 }
 
-void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
+void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData,
+                                       bool pullSuccess) {
     std::lock_guard<std::mutex> lock(mMutex);
     if (mCondition) {
+        if (!pullSuccess) {
+            // If the pull failed, we won't be able to compute a diff.
+            resetBase();
+            return;
+        }
+
         if (allData.size() == 0) {
             VLOG("Data pulled is empty");
             StatsdStats::getInstance().noteEmptyData(mPullTagId);
@@ -399,12 +406,13 @@
         // if the diff base will be cleared and this new data will serve as new diff base.
         int64_t realEventTime = allData.at(0)->GetElapsedTimestampNs();
         int64_t bucketEndTime = calcPreviousBucketEndTime(realEventTime) - 1;
-        if (bucketEndTime < mCurrentBucketStartTimeNs) {
+        bool isEventLate = bucketEndTime < mCurrentBucketStartTimeNs;
+        if (isEventLate) {
             VLOG("Skip bucket end pull due to late arrival: %lld vs %lld", (long long)bucketEndTime,
                  (long long)mCurrentBucketStartTimeNs);
             StatsdStats::getInstance().noteLateLogEventSkipped(mMetricId);
-            return;
         }
+
         for (const auto& data : allData) {
             LogEvent localCopy = data->makeCopy();
             if (mEventMatcherWizard->matchLogEvent(localCopy, mWhatMatcherIndex) ==
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index a8dfc5b..cab50aa 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -51,7 +51,8 @@
     virtual ~ValueMetricProducer();
 
     // Process data pulled on bucket boundary.
-    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
+    void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data,
+                      bool pullSuccess) override;
 
     // ValueMetric needs special logic if it's a pulled atom.
     void notifyAppUpgrade(const int64_t& eventTimeNs, const string& apk, const int uid,
@@ -216,7 +217,9 @@
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBase);
     FRIEND_TEST(ValueMetricProducerTest, TestUseZeroDefaultBaseWithPullFailures);
     FRIEND_TEST(ValueMetricProducerTest, TestTrimUnusedDimensionKey);
-    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFail);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange);
+    FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket);
     FRIEND_TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate);
 };
 
diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
index 0ffbb54..1725160 100644
--- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp
@@ -133,7 +133,7 @@
     event->init();
     allData.push_back(event);
 
-    gaugeProducer.onDataPulled(allData);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     auto it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
     EXPECT_EQ(INT, it->mValue.getType());
@@ -151,7 +151,7 @@
     event2->write(25);
     event2->init();
     allData.push_back(event2);
-    gaugeProducer.onDataPulled(allData);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     it = gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->begin();
     EXPECT_EQ(INT, it->mValue.getType());
@@ -305,7 +305,7 @@
     event->write(1);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
                          ->second.front()
@@ -328,7 +328,7 @@
     event->write(3);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(2UL, gaugeProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(3, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -371,7 +371,7 @@
     event->write(1);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(1, gaugeProducer.mCurrentSlicedBucket->begin()
                          ->second.front()
@@ -440,7 +440,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true);
 
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()
@@ -541,7 +541,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    gaugeProducer.onDataPulled(allData);
+    gaugeProducer.onDataPulled(allData, /** succeed */ true);
 
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
@@ -590,7 +590,7 @@
     event1->write(13);
     event1->init();
 
-    gaugeProducer.onDataPulled({event1});
+    gaugeProducer.onDataPulled({event1}, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()
                            ->second.front()
@@ -604,7 +604,7 @@
     event2->write(15);
     event2->init();
 
-    gaugeProducer.onDataPulled({event2});
+    gaugeProducer.onDataPulled({event2}, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()
                            ->second.front()
@@ -619,7 +619,7 @@
     event3->write(26);
     event3->init();
 
-    gaugeProducer.onDataPulled({event3});
+    gaugeProducer.onDataPulled({event3}, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_EQ(26L, gaugeProducer.mCurrentSlicedBucket->begin()
                            ->second.front()
@@ -633,7 +633,7 @@
             std::make_shared<LogEvent>(tagId, bucketStartTimeNs + 3 * bucketSizeNs + 10);
     event4->write("some value");
     event4->init();
-    gaugeProducer.onDataPulled({event4});
+    gaugeProducer.onDataPulled({event4}, /** succeed */ true);
     EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
     EXPECT_TRUE(gaugeProducer.mCurrentSlicedBucket->begin()->second.front().mFields->empty());
 }
diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
index c0648ee..df0ae38 100644
--- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
+++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp
@@ -160,7 +160,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -177,7 +177,7 @@
     event->write(23);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -196,7 +196,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
 
@@ -256,7 +256,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval =
@@ -274,7 +274,7 @@
     event->write(23);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -292,7 +292,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
 
@@ -341,7 +341,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -357,7 +357,7 @@
     event->write(10);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -373,7 +373,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(true, curInterval.hasBase);
@@ -420,7 +420,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -436,7 +436,7 @@
     event->write(10);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -451,7 +451,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(true, curInterval.hasBase);
@@ -525,7 +525,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
@@ -635,7 +635,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
 
     valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -650,7 +650,7 @@
     event->write(150);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mPastBuckets[DEFAULT_METRIC_DIMENSION_KEY].size());
     EXPECT_EQ(bucket2StartTimeNs + 150, valueProducer.mCurrentBucketStartTimeNs);
     EXPECT_EQ(20L,
@@ -689,7 +689,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
 
     valueProducer.notifyAppUpgrade(bucket2StartTimeNs + 150, "ANY.APP", 1, 1);
@@ -993,7 +993,7 @@
     event->init();
     allData.push_back(event);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     ValueMetricProducer::Interval curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1011,7 +1011,7 @@
     event->write(23);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
@@ -1032,7 +1032,7 @@
     event->write(36);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     // startUpdated:false sum:12
@@ -1120,7 +1120,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(false, curInterval.hasBase);
@@ -1224,7 +1224,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     curInterval = valueProducer.mCurrentSlicedBucket.begin()->second[0];
     EXPECT_EQ(true, curInterval.hasBase);
@@ -1677,7 +1677,7 @@
     allData.push_back(event1);
     allData.push_back(event2);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval1.hasBase);
     EXPECT_EQ(11, interval1.base.long_value);
@@ -1762,7 +1762,7 @@
     allData.push_back(event1);
     allData.push_back(event2);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval1.hasBase);
     EXPECT_EQ(11, interval1.base.long_value);
@@ -1791,7 +1791,7 @@
     event1->write(5);
     event1->init();
     allData.push_back(event1);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval2.hasBase);
@@ -1813,7 +1813,7 @@
     event2->write(5);
     event2->init();
     allData.push_back(event2);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval2.hasBase);
@@ -1888,7 +1888,7 @@
     allData.push_back(event1);
     allData.push_back(event2);
 
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval1.hasBase);
     EXPECT_EQ(11, interval1.base.long_value);
@@ -1918,7 +1918,7 @@
     event1->write(5);
     event1->init();
     allData.push_back(event1);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     EXPECT_EQ(2UL, valueProducer.mCurrentSlicedBucket.size());
 
@@ -1941,7 +1941,7 @@
     event1->write(13);
     event1->init();
     allData.push_back(event1);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
     EXPECT_EQ(true, interval2.hasBase);
@@ -1951,7 +1951,60 @@
     EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
 }
 
-TEST(ValueMetricProducerTest, TestResetBaseOnPullFail) {
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange_EndOfBucket) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(INT_MAX);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    // Used by onConditionChanged.
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+                event->write(tagId);
+                event->write(100);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.onConditionChanged(true, bucketStartTimeNs + 8);
+    // has one slice
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval& curInterval =
+            valueProducer.mCurrentSlicedBucket.begin()->second[0];
+    EXPECT_EQ(true, curInterval.hasBase);
+    EXPECT_EQ(100, curInterval.base.long_value);
+    EXPECT_EQ(false, curInterval.hasValue);
+
+    vector<shared_ptr<LogEvent>> allData;
+    valueProducer.onDataPulled(allData, /** succeed */ false);
+    EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());
+    EXPECT_EQ(false, curInterval.hasBase);
+    EXPECT_EQ(false, curInterval.hasValue);
+    EXPECT_EQ(false, valueProducer.mHasGlobalBase);
+}
+
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailAfterConditionChange) {
     ValueMetric metric;
     metric.set_id(metricId);
     metric.set_bucket(ONE_MINUTE);
@@ -2007,6 +2060,53 @@
     EXPECT_EQ(false, valueProducer.mHasGlobalBase);
 }
 
+TEST(ValueMetricProducerTest, TestResetBaseOnPullFailBeforeConditionChange) {
+    ValueMetric metric;
+    metric.set_id(metricId);
+    metric.set_bucket(ONE_MINUTE);
+    metric.mutable_value_field()->set_field(tagId);
+    metric.mutable_value_field()->add_child()->set_field(2);
+    metric.set_condition(StringToId("SCREEN_ON"));
+    metric.set_max_pull_delay_sec(0);
+
+    UidMap uidMap;
+    SimpleAtomMatcher atomMatcher;
+    atomMatcher.set_atom_id(tagId);
+    sp<EventMatcherWizard> eventMatcherWizard =
+            new EventMatcherWizard({new SimpleLogMatchingTracker(
+                    atomMatcherId, logEventMatcherIndex, atomMatcher, uidMap)});
+    sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
+    sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
+    EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
+    EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
+
+    EXPECT_CALL(*pullerManager, Pull(tagId, _))
+            .WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
+                data->clear();
+                shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 8);
+                event->write(tagId);
+                event->write(100);
+                event->init();
+                data->push_back(event);
+                return true;
+            }));
+
+    ValueMetricProducer valueProducer(kConfigKey, metric, 1, wizard, logEventMatcherIndex,
+                                      eventMatcherWizard, tagId, bucketStartTimeNs,
+                                      bucketStartTimeNs, pullerManager);
+
+    valueProducer.mCondition = true;
+
+    vector<shared_ptr<LogEvent>> allData;
+    valueProducer.onDataPulled(allData, /** succeed */ false);
+    EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+    ValueMetricProducer::Interval& curInterval =
+            valueProducer.mCurrentSlicedBucket.begin()->second[0];
+
+    valueProducer.onConditionChanged(false, bucket2StartTimeNs + 1);
+    EXPECT_EQ(0UL, valueProducer.mCurrentSlicedBucket.size());
+}
+
 TEST(ValueMetricProducerTest, TestResetBaseOnPullTooLate) {
     ValueMetric metric;
     metric.set_id(metricId);
@@ -2052,7 +2152,7 @@
     event->write(110);
     event->init();
     allData.push_back(event);
-    valueProducer.onDataPulled(allData);
+    valueProducer.onDataPulled(allData, /** succeed */ true);
 
     // has one slice
     EXPECT_EQ(1UL, valueProducer.mCurrentSlicedBucket.size());