Introduces an option to set a dump latency requirement.

We are currently dumping invalid data for pulled metrics. Pulled metrics
require a new pull when flushing a bucket. We should either do another
pull or invalidate the previous bucket.

There are cases where we cannot afford to do another pull, e.g. statsd
being killed. If we do not have enough time, we'll just invalidte the
bucket to make sure we have correct data.

Bug: 123866830
Test: atest statsd_test
Change-Id: I090127cace3b7265032ebb2c9bddae976c883771
diff --git a/cmds/statsd/src/StatsLogProcessor.cpp b/cmds/statsd/src/StatsLogProcessor.cpp
index 653ef2e..a6699e7 100644
--- a/cmds/statsd/src/StatsLogProcessor.cpp
+++ b/cmds/statsd/src/StatsLogProcessor.cpp
@@ -298,7 +298,7 @@
 void StatsLogProcessor::OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
                                         const StatsdConfig& config) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED);
+    WriteDataToDiskLocked(key, timestampNs, CONFIG_UPDATED, NO_TIME_CONSTRAINTS);
     OnConfigUpdatedLocked(timestampNs, key, config);
 }
 
@@ -355,6 +355,7 @@
                                      const bool include_current_partial_bucket,
                                      const bool erase_data,
                                      const DumpReportReason dumpReportReason,
+                                     const DumpLatency dumpLatency,
                                      ProtoOutputStream* proto) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
 
@@ -378,8 +379,10 @@
         // Start of ConfigMetricsReport (reports).
         uint64_t reportsToken =
                 proto->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS);
-        onConfigMetricsReportLocked(key, dumpTimeStampNs, include_current_partial_bucket,
-                                    erase_data, dumpReportReason, proto);
+        onConfigMetricsReportLocked(key, dumpTimeStampNs,
+                                    include_current_partial_bucket,
+                                    erase_data, dumpReportReason,
+                                    dumpLatency, proto);
         proto->end(reportsToken);
         // End of ConfigMetricsReport (reports).
     } else {
@@ -394,10 +397,11 @@
                                      const bool include_current_partial_bucket,
                                      const bool erase_data,
                                      const DumpReportReason dumpReportReason,
+                                     const DumpLatency dumpLatency,
                                      vector<uint8_t>* outData) {
     ProtoOutputStream proto;
     onDumpReport(key, dumpTimeStampNs, include_current_partial_bucket, erase_data,
-                 dumpReportReason, &proto);
+                 dumpReportReason, dumpLatency, &proto);
 
     if (outData != nullptr) {
         outData->clear();
@@ -423,6 +427,7 @@
                                                     const bool include_current_partial_bucket,
                                                     const bool erase_data,
                                                     const DumpReportReason dumpReportReason,
+                                                    const DumpLatency dumpLatency,
                                                     ProtoOutputStream* proto) {
     // We already checked whether key exists in mMetricsManagers in
     // WriteDataToDisk.
@@ -438,7 +443,7 @@
     // First, fill in ConfigMetricsReport using current data on memory, which
     // starts from filling in StatsLogReport's.
     it->second->onDumpReport(dumpTimeStampNs, include_current_partial_bucket,
-                             erase_data, &str_set, proto);
+                             erase_data, dumpLatency, &str_set, proto);
 
     // Fill in UidMap if there is at least one metric to report.
     // This skips the uid map if it's an empty config.
@@ -492,7 +497,7 @@
         }
     }
     if (configKeysTtlExpired.size() > 0) {
-        WriteDataToDiskLocked(CONFIG_RESET);
+        WriteDataToDiskLocked(CONFIG_RESET, NO_TIME_CONSTRAINTS);
         resetConfigsLocked(timestampNs, configKeysTtlExpired);
     }
 }
@@ -501,7 +506,8 @@
     std::lock_guard<std::mutex> lock(mMetricsMutex);
     auto it = mMetricsManagers.find(key);
     if (it != mMetricsManagers.end()) {
-        WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED);
+        WriteDataToDiskLocked(key, getElapsedRealtimeNs(), CONFIG_REMOVED, 
+                              NO_TIME_CONSTRAINTS);
         mMetricsManagers.erase(it);
         mUidMap->OnConfigRemoved(key);
     }
@@ -572,14 +578,15 @@
 
 void StatsLogProcessor::WriteDataToDiskLocked(const ConfigKey& key,
                                               const int64_t timestampNs,
-                                              const DumpReportReason dumpReportReason) {
+                                              const DumpReportReason dumpReportReason,
+                                              const DumpLatency dumpLatency) {
     if (mMetricsManagers.find(key) == mMetricsManagers.end() ||
         !mMetricsManagers.find(key)->second->shouldWriteToDisk()) {
         return;
     }
     ProtoOutputStream proto;
     onConfigMetricsReportLocked(key, timestampNs, true /* include_current_partial_bucket*/,
-                                true /* erase_data */, dumpReportReason, &proto);
+                                true /* erase_data */, dumpReportReason, dumpLatency, &proto);
     string file_name = StringPrintf("%s/%ld_%d_%lld", STATS_DATA_DIR,
          (long)getWallClockSec(), key.GetUid(), (long long)key.GetId());
     android::base::unique_fd fd(open(file_name.c_str(),
@@ -658,7 +665,8 @@
     StorageManager::deleteFile(file_name.c_str());
 }
 
-void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason) {
+void StatsLogProcessor::WriteDataToDiskLocked(const DumpReportReason dumpReportReason,
+                                              const DumpLatency dumpLatency) {
     const int64_t timeNs = getElapsedRealtimeNs();
     // Do not write to disk if we already have in the last few seconds.
     // This is to avoid overwriting files that would have the same name if we
@@ -671,13 +679,14 @@
     }
     mLastWriteTimeNs = timeNs;
     for (auto& pair : mMetricsManagers) {
-        WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason);
+        WriteDataToDiskLocked(pair.first, timeNs, dumpReportReason, dumpLatency);
     }
 }
 
-void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason) {
+void StatsLogProcessor::WriteDataToDisk(const DumpReportReason dumpReportReason, 
+                                        const DumpLatency dumpLatency) {
     std::lock_guard<std::mutex> lock(mMetricsMutex);
-    WriteDataToDiskLocked(dumpReportReason);
+    WriteDataToDiskLocked(dumpReportReason, dumpLatency);
 }
 
 void StatsLogProcessor::informPullAlarmFired(const int64_t timestampNs) {
diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h
index ea9c6e7..e92b897 100644
--- a/cmds/statsd/src/StatsLogProcessor.h
+++ b/cmds/statsd/src/StatsLogProcessor.h
@@ -66,10 +66,14 @@
 
     void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs,
                       const bool include_current_partial_bucket, const bool erase_data,
-                      const DumpReportReason dumpReportReason, vector<uint8_t>* outData);
+                      const DumpReportReason dumpReportReason, 
+                      const DumpLatency dumpLatency,
+                      vector<uint8_t>* outData);
     void onDumpReport(const ConfigKey& key, const int64_t dumpTimeNs,
                       const bool include_current_partial_bucket, const bool erase_data,
-                      const DumpReportReason dumpReportReason, ProtoOutputStream* proto);
+                      const DumpReportReason dumpReportReason,
+                      const DumpLatency dumpLatency,
+                      ProtoOutputStream* proto);
 
     /* Tells MetricsManager that the alarms in alarmSet have fired. Modifies anomaly alarmSet. */
     void onAnomalyAlarmFired(
@@ -82,7 +86,8 @@
             unordered_set<sp<const InternalAlarm>, SpHash<InternalAlarm>> alarmSet);
 
     /* Flushes data to disk. Data on memory will be gone after written to disk. */
-    void WriteDataToDisk(const DumpReportReason dumpReportReason);
+    void WriteDataToDisk(const DumpReportReason dumpReportReason,
+                         const DumpLatency dumpLatency);
 
     /* Persist metric activation status onto disk. */
     void WriteMetricsActivationToDisk(int64_t currentTimeNs);
@@ -153,14 +158,17 @@
 
     void GetActiveConfigsLocked(const int uid, vector<int64_t>& outActiveConfigs);
 
-    void WriteDataToDiskLocked(const DumpReportReason dumpReportReason);
+    void WriteDataToDiskLocked(const DumpReportReason dumpReportReason,
+                               const DumpLatency dumpLatency);
     void WriteDataToDiskLocked(const ConfigKey& key, const int64_t timestampNs,
-                               const DumpReportReason dumpReportReason);
+                               const DumpReportReason dumpReportReason,
+                               const DumpLatency dumpLatency);
 
     void onConfigMetricsReportLocked(const ConfigKey& key, const int64_t dumpTimeStampNs,
                                      const bool include_current_partial_bucket,
                                      const bool erase_data,
                                      const DumpReportReason dumpReportReason,
+                                     const DumpLatency dumpLatency,
                                      util::ProtoOutputStream* proto);
 
     /* Check if we should send a broadcast if approaching memory limits and if we're over, we
diff --git a/cmds/statsd/src/StatsService.cpp b/cmds/statsd/src/StatsService.cpp
index c542b62..4deb8bd 100644
--- a/cmds/statsd/src/StatsService.cpp
+++ b/cmds/statsd/src/StatsService.cpp
@@ -313,7 +313,9 @@
                 proto.start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_REPORTS_LIST);
         mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(),
                                  true /* includeCurrentBucket */, false /* erase_data */,
-                                 ADB_DUMP, &proto);
+                                 ADB_DUMP,
+                                 FAST,
+                                 &proto);
         proto.end(reportsListToken);
         proto.flush(out);
         proto.clear();
@@ -694,7 +696,9 @@
         if (good) {
             vector<uint8_t> data;
             mProcessor->onDumpReport(ConfigKey(uid, StrToInt64(name)), getElapsedRealtimeNs(),
-                                     includeCurrentBucket, eraseData, ADB_DUMP, &data);
+                                     includeCurrentBucket, eraseData, ADB_DUMP,
+                                     NO_TIME_CONSTRAINTS,
+                                     &data);
             if (proto) {
                 for (size_t i = 0; i < data.size(); i ++) {
                     dprintf(out, "%c", data[i]);
@@ -758,7 +762,7 @@
 
 status_t StatsService::cmd_write_data_to_disk(int out) {
     dprintf(out, "Writing data to disk\n");
-    mProcessor->WriteDataToDisk(ADB_DUMP);
+    mProcessor->WriteDataToDisk(ADB_DUMP, NO_TIME_CONSTRAINTS);
     return NO_ERROR;
 }
 
@@ -958,7 +962,7 @@
 Status StatsService::informDeviceShutdown() {
     ENFORCE_UID(AID_SYSTEM);
     VLOG("StatsService::informDeviceShutdown");
-    mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN);
+    mProcessor->WriteDataToDisk(DEVICE_SHUTDOWN, FAST);
     mProcessor->WriteMetricsActivationToDisk(getElapsedRealtimeNs());
     return Status::ok();
 }
@@ -1000,7 +1004,7 @@
 void StatsService::Terminate() {
     ALOGI("StatsService::Terminating");
     if (mProcessor != nullptr) {
-        mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED);
+        mProcessor->WriteDataToDisk(TERMINATION_SIGNAL_RECEIVED, FAST);
     }
 }
 
@@ -1017,8 +1021,10 @@
     IPCThreadState* ipc = IPCThreadState::self();
     VLOG("StatsService::getData with Pid %i, Uid %i", ipc->getCallingPid(), ipc->getCallingUid());
     ConfigKey configKey(ipc->getCallingUid(), key);
+    // The dump latency does not matter here since we do not include the current bucket, we do not
+    // need to pull any new data anyhow.
     mProcessor->onDumpReport(configKey, getElapsedRealtimeNs(), false /* include_current_bucket*/,
-                             true /* erase_data */, GET_DATA_CALLED, output);
+                             true /* erase_data */, GET_DATA_CALLED, FAST, output);
     return Status::ok();
 }
 
@@ -1312,7 +1318,7 @@
     StatsdStats::getInstance().noteSystemServerRestart(getWallClockSec());
     if (mProcessor != nullptr) {
         ALOGW("Reset statsd upon system server restarts.");
-        mProcessor->WriteDataToDisk(STATSCOMPANION_DIED);
+        mProcessor->WriteDataToDisk(STATSCOMPANION_DIED, FAST);
         mProcessor->resetConfigs();
     }
     mAnomalyAlarmMonitor->setStatsCompanionService(nullptr);
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp
index e84f88d..96d1447 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp
@@ -139,13 +139,13 @@
 
 
 void CountMetricProducer::clearPastBucketsLocked(const int64_t dumpTimeNs) {
-    flushIfNeededLocked(dumpTimeNs);
     mPastBuckets.clear();
 }
 
 void CountMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     if (include_current_partial_bucket) {
@@ -319,7 +319,7 @@
         return;
     }
 
-    flushCurrentBucketLocked(eventTimeNs);
+    flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     // Setup the bucket start time and number.
     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
     mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
@@ -328,7 +328,8 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
-void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void CountMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                   const int64_t& nextBucketStartTimeNs) {
     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
     CountBucket info;
     info.mBucketStartNs = mCurrentBucketStartTimeNs;
diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h
index 1ac44264..b4a910c 100644
--- a/cmds/statsd/src/metrics/CountMetricProducer.h
+++ b/cmds/statsd/src/metrics/CountMetricProducer.h
@@ -57,6 +57,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
 
@@ -78,7 +79,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& newEventTime) override;
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     std::unordered_map<MetricDimensionKey, std::vector<CountBucket>> mPastBuckets;
 
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
index da6b97c..5e4594b 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp
@@ -456,6 +456,7 @@
 void DurationMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                                 const bool include_current_partial_bucket,
                                                 const bool erase_data,
+                                                const DumpLatency dumpLatency,
                                                 std::set<string> *str_set,
                                                 ProtoOutputStream* protoOutput) {
     if (include_current_partial_bucket) {
@@ -581,7 +582,8 @@
     mCurrentBucketNum += numBucketsForward;
 }
 
-void DurationMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void DurationMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                      const int64_t& nextBucketStartTimeNs) {
     for (auto whatIt = mCurrentSlicedDurationTrackerMap.begin();
             whatIt != mCurrentSlicedDurationTrackerMap.end();) {
         for (auto it = whatIt->second.begin(); it != whatIt->second.end();) {
diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h
index ec561b5..f711df2 100644
--- a/cmds/statsd/src/metrics/DurationMetricProducer.h
+++ b/cmds/statsd/src/metrics/DurationMetricProducer.h
@@ -64,6 +64,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
 
@@ -88,7 +89,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& eventTime);
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     const DurationMetric_AggregationType mAggregationType;
 
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp
index 3b4af65..5435c84 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp
@@ -108,6 +108,7 @@
 void EventMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h
index 96adfdd..74e6bc8 100644
--- a/cmds/statsd/src/metrics/EventMetricProducer.h
+++ b/cmds/statsd/src/metrics/EventMetricProducer.h
@@ -48,6 +48,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
index 6301793..7b001b3 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp
@@ -184,6 +184,7 @@
 void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     VLOG("Gauge metric %lld report now...", (long long)mMetricId);
@@ -528,7 +529,7 @@
         return;
     }
 
-    flushCurrentBucketLocked(eventTimeNs);
+    flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
 
     // Adjusts the bucket start and end times.
     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
@@ -538,7 +539,8 @@
          (long long)mCurrentBucketStartTimeNs);
 }
 
-void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                   const int64_t& nextBucketStartTimeNs) {
     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
 
     GaugeBucket info;
diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h
index 64a1833..9b99fb1 100644
--- a/cmds/statsd/src/metrics/GaugeMetricProducer.h
+++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h
@@ -82,8 +82,7 @@
             // Flush full buckets on the normal path up to the latest bucket boundary.
             flushIfNeededLocked(eventTimeNs);
         }
-        flushCurrentBucketLocked(eventTimeNs);
-        mCurrentBucketStartTimeNs = eventTimeNs;
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
         if (mIsPulled && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
             pullAndMatchEventsLocked(eventTimeNs);
         }
@@ -99,6 +98,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
@@ -119,7 +119,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& eventTime) override;
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     void pullAndMatchEventsLocked(const int64_t timestampNs);
 
diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h
index 8ab3b06..99cb5d4 100644
--- a/cmds/statsd/src/metrics/MetricProducer.h
+++ b/cmds/statsd/src/metrics/MetricProducer.h
@@ -46,6 +46,16 @@
     kActiveOnBoot = 2,
 };
 
+enum DumpLatency {
+    // In some cases, we only have a short time range to do the dump, e.g. statsd is being killed.
+    // We might be able to return all the data in this mode. For instance, pull metrics might need
+    // to be pulled when the current bucket is requested.
+    FAST = 1,
+    // In other cases, it is fine for a dump to take more than a few milliseconds, e.g. config
+    // updates.
+    NO_TIME_CONSTRAINTS = 2
+};
+
 // A MetricProducer is responsible for compute one single metrics, creating stats log report, and
 // writing the report to dropbox. MetricProducers should respond to package changes as required in
 // PackageInfoListener, but if none of the metrics are slicing by package name, then the update can
@@ -87,8 +97,7 @@
             flushIfNeededLocked(eventTimeNs);
         }
         // Now flush a partial bucket.
-        flushCurrentBucketLocked(eventTimeNs);
-        mCurrentBucketStartTimeNs = eventTimeNs;
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
         // Don't update the current bucket number so that the anomaly tracker knows this bucket
         // is a partial bucket and can merge it with the previous bucket.
     };
@@ -135,11 +144,12 @@
     void onDumpReport(const int64_t dumpTimeNs,
                       const bool include_current_partial_bucket,
                       const bool erase_data,
+                      const DumpLatency dumpLatency,
                       std::set<string> *str_set,
                       android::util::ProtoOutputStream* protoOutput) {
         std::lock_guard<std::mutex> lock(mMutex);
         return onDumpReportLocked(dumpTimeNs, include_current_partial_bucket, erase_data,
-                str_set, protoOutput);
+                dumpLatency, str_set, protoOutput);
     }
 
     void clearPastBuckets(const int64_t dumpTimeNs) {
@@ -239,6 +249,7 @@
     virtual void onDumpReportLocked(const int64_t dumpTimeNs,
                                     const bool include_current_partial_bucket,
                                     const bool erase_data,
+                                    const DumpLatency dumpLatency,
                                     std::set<string> *str_set,
                                     android::util::ProtoOutputStream* protoOutput) = 0;
     virtual void clearPastBucketsLocked(const int64_t dumpTimeNs) = 0;
@@ -270,7 +281,7 @@
      */
     virtual void flushLocked(const int64_t& eventTimeNs) {
         flushIfNeededLocked(eventTimeNs);
-        flushCurrentBucketLocked(eventTimeNs);
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     };
 
     /**
@@ -283,7 +294,8 @@
      * flushIfNeededLocked or the app upgrade handler; the caller MUST update the bucket timestamp
      * and bucket number as needed.
      */
-    virtual void flushCurrentBucketLocked(const int64_t& eventTimeNs){};
+    virtual void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                          const int64_t& nextBucketStartTimeNs) {};
 
     // Convenience to compute the current bucket's end time, which is always aligned with the
     // start time of the metric.
diff --git a/cmds/statsd/src/metrics/MetricsManager.cpp b/cmds/statsd/src/metrics/MetricsManager.cpp
index 4851a8d..4b3bfd3 100644
--- a/cmds/statsd/src/metrics/MetricsManager.cpp
+++ b/cmds/statsd/src/metrics/MetricsManager.cpp
@@ -217,6 +217,7 @@
 void MetricsManager::onDumpReport(const int64_t dumpTimeStampNs,
                                   const bool include_current_partial_bucket,
                                   const bool erase_data,
+                                  const DumpLatency dumpLatency,
                                   std::set<string> *str_set,
                                   ProtoOutputStream* protoOutput) {
     VLOG("=========================Metric Reports Start==========================");
@@ -227,10 +228,10 @@
                     FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_METRICS);
             if (mHashStringsInReport) {
                 producer->onDumpReport(dumpTimeStampNs, include_current_partial_bucket, erase_data,
-                                       str_set, protoOutput);
+                                       dumpLatency, str_set, protoOutput);
             } else {
                 producer->onDumpReport(dumpTimeStampNs, include_current_partial_bucket, erase_data,
-                                       nullptr, protoOutput);
+                                       dumpLatency, nullptr, protoOutput);
             }
             protoOutput->end(token);
         } else {
diff --git a/cmds/statsd/src/metrics/MetricsManager.h b/cmds/statsd/src/metrics/MetricsManager.h
index eab1f76..3904460 100644
--- a/cmds/statsd/src/metrics/MetricsManager.h
+++ b/cmds/statsd/src/metrics/MetricsManager.h
@@ -121,6 +121,7 @@
     virtual void onDumpReport(const int64_t dumpTimeNs,
                               const bool include_current_partial_bucket,
                               const bool erase_data,
+                              const DumpLatency dumpLatency,
                               std::set<string> *str_set,
                               android::util::ProtoOutputStream* protoOutput);
 
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index 3cf378d..e94b75c 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -188,13 +188,28 @@
 void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
                                              const bool include_current_partial_bucket,
                                              const bool erase_data,
+                                             const DumpLatency dumpLatency,
                                              std::set<string> *str_set,
                                              ProtoOutputStream* protoOutput) {
     VLOG("metric %lld dump report now...", (long long)mMetricId);
+    flushIfNeededLocked(dumpTimeNs);
     if (include_current_partial_bucket) {
-        flushLocked(dumpTimeNs);
-    } else {
-        flushIfNeededLocked(dumpTimeNs);
+        // For pull metrics, we need to do a pull at bucket boundaries. If we do not do that the
+        // current bucket will have incomplete data and the next will have the wrong snapshot to do
+        // a diff against. If the condition is false, we are fine since the base data is reset and
+        // we are not tracking anything.
+        bool pullNeeded = mIsPulled && mCondition == ConditionState::kTrue;
+        if (pullNeeded) {
+            switch (dumpLatency) {
+                case FAST:
+                    invalidateCurrentBucket();
+                    break;
+                case NO_TIME_CONSTRAINTS:
+                    pullAndMatchEventsLocked(dumpTimeNs);
+                    break;
+            }
+        } 
+        flushCurrentBucketLocked(dumpTimeNs, dumpTimeNs);
     }
     protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
     protoOutput->write(FIELD_TYPE_BOOL | FIELD_ID_IS_ACTIVE, isActiveLocked());
@@ -712,23 +727,21 @@
         return;
     }
 
-    flushCurrentBucketLocked(eventTimeNs);
-
     int64_t numBucketsForward = 1 + (eventTimeNs - currentBucketEndTimeNs) / mBucketSizeNs;
-    mCurrentBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
-    mCurrentBucketNum += numBucketsForward;
+    int64_t nextBucketStartTimeNs = currentBucketEndTimeNs + (numBucketsForward - 1) * mBucketSizeNs;
+    flushCurrentBucketLocked(eventTimeNs, nextBucketStartTimeNs);
 
+    mCurrentBucketNum += numBucketsForward;
     if (numBucketsForward > 1) {
         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
         StatsdStats::getInstance().noteSkippedForwardBuckets(mMetricId);
         // take base again in future good bucket.
         resetBase();
     }
-    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
-         (long long)mCurrentBucketStartTimeNs);
 }
 
-void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
+void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                                   const int64_t& nextBucketStartTimeNs) {
     if (mCondition == ConditionState::kUnknown) {
         StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
     }
@@ -758,6 +771,9 @@
     }
     initCurrentSlicedBucket();
     mCurrentBucketIsInvalid = false;
+    mCurrentBucketStartTimeNs = nextBucketStartTimeNs;
+    VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
+         (long long)mCurrentBucketStartTimeNs);
 }
 
 ValueBucket ValueMetricProducer::buildPartialBucket(int64_t bucketEndTime,
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h
index f26ad85..696d4fa 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.h
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.h
@@ -65,8 +65,7 @@
         if (mIsPulled && mCondition) {
             pullAndMatchEventsLocked(eventTimeNs - 1);
         }
-        flushCurrentBucketLocked(eventTimeNs);
-        mCurrentBucketStartTimeNs = eventTimeNs;
+        flushCurrentBucketLocked(eventTimeNs, eventTimeNs);
     };
 
 protected:
@@ -79,6 +78,7 @@
     void onDumpReportLocked(const int64_t dumpTimeNs,
                             const bool include_current_partial_bucket,
                             const bool erase_data,
+                            const DumpLatency dumpLatency,
                             std::set<string> *str_set,
                             android::util::ProtoOutputStream* protoOutput) override;
     void clearPastBucketsLocked(const int64_t dumpTimeNs) override;
@@ -97,7 +97,8 @@
     // Util function to flush the old packet.
     void flushIfNeededLocked(const int64_t& eventTime) override;
 
-    void flushCurrentBucketLocked(const int64_t& eventTimeNs) override;
+    void flushCurrentBucketLocked(const int64_t& eventTimeNs,
+                                  const int64_t& nextBucketStartTimeNs) override;
 
     void dropDataLocked(const int64_t dropTimeNs) override;