ValueMetric supports multiple aggregation types

1. Add support for MIN, MAX, AVG
2. ValueMetric also allow floats now, in addition to long data type.
AnomalyDetection still takes long only. I am not sure if it makes
sense to do anomaly on AVG. I will leave that for later.
3. ValueMetric supports sliced condition change for pushed events.
I don't think it makes sense for pulled events to have sliced condition
changes so leave it for now.

Test: unit test
Change-Id: I8bc510d98ea9b8a6eb16d04ff99dce6b574249cd
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index f5e953a..c6f7bb4 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-#define DEBUG false  // STOPSHIP if true
+#define DEBUG true  // STOPSHIP if true
 #include "Log.h"
 
 #include "ValueMetricProducer.h"
@@ -27,7 +27,7 @@
 
 using android::util::FIELD_COUNT_REPEATED;
 using android::util::FIELD_TYPE_BOOL;
-using android::util::FIELD_TYPE_FLOAT;
+using android::util::FIELD_TYPE_DOUBLE;
 using android::util::FIELD_TYPE_INT32;
 using android::util::FIELD_TYPE_INT64;
 using android::util::FIELD_TYPE_MESSAGE;
@@ -64,7 +64,8 @@
 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
 // for ValueBucketInfo
-const int FIELD_ID_VALUE = 3;
+const int FIELD_ID_VALUE_LONG = 3;
+const int FIELD_ID_VALUE_DOUBLE = 7;
 const int FIELD_ID_BUCKET_NUM = 4;
 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
@@ -79,6 +80,7 @@
       mPullerManager(pullerManager),
       mValueField(metric.value_field()),
       mPullTagId(pullTagId),
+      mIsPulled(pullTagId != -1),
       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
       mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
@@ -88,7 +90,9 @@
                                           StatsdStats::kAtomDimensionKeySizeLimitMap.end()
                                   ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).second
                                   : StatsdStats::kDimensionKeySizeHardLimit),
-      mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()) {
+      mUseAbsoluteValueOnReset(metric.use_absolute_value_on_reset()),
+      mAggregationType(metric.aggregation_type()),
+      mValueType(metric.aggregation_type() == ValueMetric::AVG ? DOUBLE : LONG) {
     int64_t bucketSizeMills = 0;
     if (metric.has_bucket()) {
         bucketSizeMills = TimeUnitToBucketSizeInMillisGuardrailed(key.GetUid(), metric.bucket());
@@ -123,9 +127,9 @@
     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
             HasPositionALL(metric.dimensions_in_condition());
 
-    // Kicks off the puller immediately.
     flushIfNeededLocked(startTimestampNs);
-    if (mPullTagId != -1) {
+    // Kicks off the puller immediately.
+    if (mIsPulled) {
         mPullerManager->RegisterReceiver(mPullTagId, this,
                                          mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs);
     }
@@ -136,7 +140,7 @@
 
 ValueMetricProducer::~ValueMetricProducer() {
     VLOG("~ValueMetricProducer() called");
-    if (mPullTagId != -1) {
+    if (mIsPulled) {
         mPullerManager->UnRegisterReceiver(mPullTagId, this);
     }
 }
@@ -245,11 +249,15 @@
                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
             }
-
-            protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE, (long long)bucket.mValue);
+            if (mValueType == LONG) {
+                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
+                                   (long long)bucket.mValueLong);
+            } else {
+                protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE, bucket.mValueDouble);
+            }
             protoOutput->end(bucketInfoToken);
-            VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
-                 (long long)bucket.mBucketEndNs, (long long)bucket.mValue);
+            VLOG("\t bucket [%lld - %lld] count: %lld, %.2f", (long long)bucket.mBucketStartNs,
+                 (long long)bucket.mBucketEndNs, (long long)bucket.mValueLong, bucket.mValueDouble);
         }
         protoOutput->end(wrapperToken);
     }
@@ -271,7 +279,7 @@
 
     flushIfNeededLocked(eventTimeNs);
 
-    if (mPullTagId != -1) {
+    if (mIsPulled) {
         vector<shared_ptr<LogEvent>> allData;
         if (mPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) {
             if (allData.size() == 0) {
@@ -321,10 +329,10 @@
             (unsigned long)mCurrentSlicedBucket.size());
     if (verbose) {
         for (const auto& it : mCurrentSlicedBucket) {
-            fprintf(out, "\t(what)%s\t(condition)%s  (value)%lld\n",
-                it.first.getDimensionKeyInWhat().toString().c_str(),
-                it.first.getDimensionKeyInCondition().toString().c_str(),
-                (unsigned long long)it.second.sum);
+            fprintf(out, "\t(what)%s\t(condition)%s  (value)%s\n",
+                    it.first.getDimensionKeyInWhat().toString().c_str(),
+                    it.first.getDimensionKeyInCondition().toString().c_str(),
+                    it.second.value.toString().c_str());
         }
     }
 }
@@ -349,6 +357,27 @@
     return false;
 }
 
+const Value getDoubleOrLong(const Value& value) {
+    Value v;
+    switch (value.type) {
+        case INT:
+            v.setLong(value.int_value);
+            break;
+        case LONG:
+            v.setLong(value.long_value);
+            break;
+        case FLOAT:
+            v.setDouble(value.float_value);
+            break;
+        case DOUBLE:
+            v.setDouble(value.double_value);
+            break;
+        default:
+            break;
+    }
+    return v;
+}
+
 void ValueMetricProducer::onMatchedLogEventInternalLocked(
         const size_t matcherIndex, const MetricDimensionKey& eventKey,
         const ConditionKey& conditionKey, bool condition,
@@ -367,19 +396,25 @@
     }
     Interval& interval = mCurrentSlicedBucket[eventKey];
 
-    int error = 0;
-    const int64_t value = event.GetLong(mField, &error);
-    if (error < 0) {
+    if (mField > event.size()) {
+        VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(),
+             (int)event.size());
         return;
     }
+    Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue);
 
-    if (mPullTagId != -1) { // for pulled events
+    Value diff;
+    bool hasDiff = false;
+    if (mIsPulled) {
+        // Always require condition for pulled events. In the case of no condition, only pull
+        // on bucket boundaries, in which we fake condition changes.
         if (mCondition == true) {
             if (!interval.startUpdated) {
                 interval.start = value;
                 interval.startUpdated = true;
             } else {
-                // skip it if there is already value recorded for the start
+                // Skip it if there is already value recorded for the start. Happens when puller
+                // takes too long to finish. In this case we take the previous value.
                 VLOG("Already recorded value for this dimension %s", eventKey.toString().c_str());
             }
         } else {
@@ -387,31 +422,55 @@
             // If not, take absolute value or drop it, based on config.
             if (interval.startUpdated) {
                 if (value >= interval.start) {
-                    interval.sum += (value - interval.start);
-                    interval.hasValue = true;
+                    diff = (value - interval.start);
+                    hasDiff = true;
                 } else {
                     if (mUseAbsoluteValueOnReset) {
-                        interval.sum += value;
-                        interval.hasValue = true;
+                        diff = value;
+                        hasDiff = true;
                     } else {
-                        VLOG("Dropping data for atom %d, prev: %lld, now: %lld", mPullTagId,
-                             (long long)interval.start, (long long)value);
+                        VLOG("Dropping data for atom %d, prev: %s, now: %s", mPullTagId,
+                             interval.start.toString().c_str(), value.toString().c_str());
                     }
                 }
                 interval.startUpdated = false;
             } else {
-                VLOG("No start for matching end %lld", (long long)value);
-                interval.tainted += 1;
+                VLOG("No start for matching end %s", value.toString().c_str());
             }
         }
-    } else {    // for pushed events, only accumulate when condition is true
-        if (mCondition == true || mConditionTrackerIndex < 0) {
-            interval.sum += value;
-            interval.hasValue = true;
+    } else {
+        // for pushed events, only aggregate when sliced condition is true
+        if (condition == true || mConditionTrackerIndex < 0) {
+            diff = value;
+            hasDiff = true;
         }
     }
+    if (hasDiff) {
+        if (interval.hasValue) {
+            switch (mAggregationType) {
+                case ValueMetric::SUM:
+                // for AVG, we add up and take average when flushing the bucket
+                case ValueMetric::AVG:
+                    interval.value += diff;
+                    break;
+                case ValueMetric::MIN:
+                    interval.value = diff < interval.value ? diff : interval.value;
+                    break;
+                case ValueMetric::MAX:
+                    interval.value = diff > interval.value ? diff : interval.value;
+                    break;
+                default:
+                    break;
+            }
+        } else {
+            interval.value = diff;
+            interval.hasValue = true;
+        }
+        interval.sampleSize += 1;
+    }
 
-    long wholeBucketVal = interval.sum;
+    // TODO: propgate proper values down stream when anomaly support doubles
+    long wholeBucketVal = interval.value.long_value;
     auto prev = mCurrentFullBucket.find(eventKey);
     if (prev != mCurrentFullBucket.end()) {
         wholeBucketVal += prev->second;
@@ -458,18 +517,15 @@
 
     if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
         // The current bucket is large enough to keep.
-        int tainted = 0;
         for (const auto& slice : mCurrentSlicedBucket) {
-            tainted += slice.second.tainted;
-            tainted += slice.second.startUpdated;
             if (slice.second.hasValue) {
-                info.mValue = slice.second.sum;
+                info.mValueLong = slice.second.value.long_value;
+                info.mValueDouble = (double)slice.second.value.long_value / slice.second.sampleSize;
                 // it will auto create new vector of ValuebucketInfo if the key is not found.
                 auto& bucketList = mPastBuckets[slice.first];
                 bucketList.push_back(info);
             }
         }
-        VLOG("%d tainted pairs in the bucket", tainted);
     } else {
         mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
     }
@@ -478,7 +534,8 @@
         // Accumulate partial buckets with current value and then send to anomaly tracker.
         if (mCurrentFullBucket.size() > 0) {
             for (const auto& slice : mCurrentSlicedBucket) {
-                mCurrentFullBucket[slice.first] += slice.second.sum;
+                // TODO: fix this when anomaly can accept double values
+                mCurrentFullBucket[slice.first] += slice.second.value.long_value;
             }
             for (const auto& slice : mCurrentFullBucket) {
                 for (auto& tracker : mAnomalyTrackers) {
@@ -493,7 +550,9 @@
             for (const auto& slice : mCurrentSlicedBucket) {
                 for (auto& tracker : mAnomalyTrackers) {
                     if (tracker != nullptr) {
-                        tracker->addPastBucket(slice.first, slice.second.sum, mCurrentBucketNum);
+                        // TODO: fix this when anomaly can accept double values
+                        tracker->addPastBucket(slice.first, slice.second.value.long_value,
+                                               mCurrentBucketNum);
                     }
                 }
             }
@@ -501,7 +560,8 @@
     } else {
         // Accumulate partial bucket.
         for (const auto& slice : mCurrentSlicedBucket) {
-            mCurrentFullBucket[slice.first] += slice.second.sum;
+            // TODO: fix this when anomaly can accept double values
+            mCurrentFullBucket[slice.first] += slice.second.value.long_value;
         }
     }