multi-value aggregation in ValueMetric

Allow aggregation on multiple fields, instead of one at a time.
All these fields should use the same aggregation time, use_diff,
direction, etc.
The config reuses value_field but allows multiple fields to be
specified.
The order they are specified determines the "index" of a value in the
output.

Bug: 119217634
Test: unit test
Change-Id: I38b1465d13723a897b30ee0b4f868498f60ad4db
diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
index c8b1cf0..7250b17 100644
--- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp
+++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp
@@ -64,8 +64,10 @@
 const int FIELD_ID_DIMENSION_LEAF_IN_WHAT = 4;
 const int FIELD_ID_DIMENSION_LEAF_IN_CONDITION = 5;
 // for ValueBucketInfo
-const int FIELD_ID_VALUE_LONG = 7;
-const int FIELD_ID_VALUE_DOUBLE = 8;
+const int FIELD_ID_VALUE_INDEX = 1;
+const int FIELD_ID_VALUE_LONG = 2;
+const int FIELD_ID_VALUE_DOUBLE = 3;
+const int FIELD_ID_VALUES = 9;
 const int FIELD_ID_BUCKET_NUM = 4;
 const int FIELD_ID_START_BUCKET_ELAPSED_MILLIS = 5;
 const int FIELD_ID_END_BUCKET_ELAPSED_MILLIS = 6;
@@ -78,7 +80,6 @@
                                          const sp<StatsPullerManager>& pullerManager)
     : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
       mPullerManager(pullerManager),
-      mValueField(metric.value_field()),
       mPullTagId(pullTagId),
       mIsPulled(pullTagId != -1),
       mMinBucketSizeNs(metric.min_bucket_size_nanos()),
@@ -103,6 +104,9 @@
     }
 
     mBucketSizeNs = bucketSizeMills * 1000000;
+
+    translateFieldMatcher(metric.value_field(), &mFieldMatchers);
+
     if (metric.has_dimensions_in_what()) {
         translateFieldMatcher(metric.dimensions_in_what(), &mDimensionsInWhat);
         mContainANYPositionInDimensionsInWhat = HasPositionANY(metric.dimensions_in_what());
@@ -122,9 +126,6 @@
         }
     }
 
-    if (mValueField.child_size() > 0) {
-        mField = mValueField.child(0).field();
-    }
     mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
     mSliceByPositionALL = HasPositionALL(metric.dimensions_in_what()) ||
                           HasPositionALL(metric.dimensions_in_condition());
@@ -259,18 +260,27 @@
                 protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_BUCKET_NUM,
                                    (long long)(getBucketNumFromEndTimeNs(bucket.mBucketEndNs)));
             }
-            if (bucket.value.getType() == LONG) {
-                protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
-                                   (long long)bucket.value.long_value);
-                VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
-                     (long long)bucket.mBucketEndNs, (long long)bucket.value.long_value);
-            } else if (bucket.value.getType() == DOUBLE) {
-                protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
-                                   bucket.value.double_value);
-                VLOG("\t bucket [%lld - %lld] count: %.2f", (long long)bucket.mBucketStartNs,
-                     (long long)bucket.mBucketEndNs, bucket.value.double_value);
-            } else {
-                VLOG("Wrong value type for ValueMetric output: %d", bucket.value.getType());
+            for (int i = 0; i < (int)bucket.valueIndex.size(); i ++) {
+                int index = bucket.valueIndex[i];
+                const Value& value = bucket.values[i];
+                uint64_t valueToken = protoOutput->start(
+                        FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_VALUES);
+                protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_VALUE_INDEX,
+                                   index);
+                if (value.getType() == LONG) {
+                    protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_VALUE_LONG,
+                                       (long long)value.long_value);
+                    VLOG("\t bucket [%lld - %lld] value %d: %lld", (long long)bucket.mBucketStartNs,
+                         (long long)bucket.mBucketEndNs, index, (long long)value.long_value);
+                } else if (value.getType() == DOUBLE) {
+                    protoOutput->write(FIELD_TYPE_DOUBLE | FIELD_ID_VALUE_DOUBLE,
+                                       value.double_value);
+                    VLOG("\t bucket [%lld - %lld] value %d: %.2f", (long long)bucket.mBucketStartNs,
+                         (long long)bucket.mBucketEndNs, index, value.double_value);
+                } else {
+                    VLOG("Wrong value type for ValueMetric output: %d", value.getType());
+                }
+                protoOutput->end(valueToken);
             }
             protoOutput->end(bucketInfoToken);
         }
@@ -303,7 +313,9 @@
     // when condition change from true to false, clear diff base
     if (mUseDiff && mCondition && !condition) {
         for (auto& slice : mCurrentSlicedBucket) {
-            slice.second.hasBase = false;
+            for (auto& interval : slice.second) {
+                interval.hasBase = false;
+            }
         }
     }
 
@@ -363,10 +375,12 @@
             (unsigned long)mCurrentSlicedBucket.size());
     if (verbose) {
         for (const auto& it : mCurrentSlicedBucket) {
+          for (const auto& interval : it.second) {
             fprintf(out, "\t(what)%s\t(condition)%s  (value)%s\n",
                     it.first.getDimensionKeyInWhat().toString().c_str(),
                     it.first.getDimensionKeyInCondition().toString().c_str(),
-                    it.second.value.toString().c_str());
+                    interval.value.toString().c_str());
+          }
         }
     }
 }
@@ -391,25 +405,29 @@
     return false;
 }
 
-const Value getDoubleOrLong(const Value& value) {
-    Value v;
-    switch (value.type) {
-        case INT:
-            v.setLong(value.int_value);
-            break;
-        case LONG:
-            v.setLong(value.long_value);
-            break;
-        case FLOAT:
-            v.setDouble(value.float_value);
-            break;
-        case DOUBLE:
-            v.setDouble(value.double_value);
-            break;
-        default:
-            break;
+bool getDoubleOrLong(const LogEvent& event, const Matcher& matcher, Value& ret) {
+    for (const FieldValue& value : event.getValues()) {
+        if (value.mField.matches(matcher)) {
+            switch (value.mValue.type) {
+                case INT:
+                    ret.setLong(value.mValue.int_value);
+                    break;
+                case LONG:
+                    ret.setLong(value.mValue.long_value);
+                    break;
+                case FLOAT:
+                    ret.setDouble(value.mValue.float_value);
+                    break;
+                case DOUBLE:
+                    ret.setDouble(value.mValue.double_value);
+                    break;
+                default:
+                    break;
+            }
+            return true;
+        }
     }
-    return v;
+    return false;
 }
 
 void ValueMetricProducer::onMatchedLogEventInternalLocked(const size_t matcherIndex,
@@ -436,82 +454,90 @@
     if (hitGuardRailLocked(eventKey)) {
         return;
     }
-    Interval& interval = mCurrentSlicedBucket[eventKey];
-
-    if (mField > event.size()) {
-        VLOG("Failed to extract value field %d from atom %s. %d", mField, event.ToString().c_str(),
-             (int)event.size());
-        return;
+    vector<Interval>& multiIntervals = mCurrentSlicedBucket[eventKey];
+    if (multiIntervals.size() < mFieldMatchers.size()) {
+        VLOG("Resizing number of intervals to %d", (int)mFieldMatchers.size());
+        multiIntervals.resize(mFieldMatchers.size());
     }
-    Value value = getDoubleOrLong(event.getValues()[mField - 1].mValue);
 
-    if (mUseDiff) {
-        // no base. just update base and return.
-        if (!interval.hasBase) {
-            interval.base = value;
-            interval.hasBase = true;
+    for (int i = 0; i < (int)mFieldMatchers.size(); i++) {
+        const Matcher& matcher = mFieldMatchers[i];
+        Interval& interval = multiIntervals[i];
+        interval.valueIndex = i;
+        Value value;
+        if (!getDoubleOrLong(event, matcher, value)) {
+            VLOG("Failed to get value %d from event %s", i, event.ToString().c_str());
             return;
         }
-        Value diff;
-        switch (mValueDirection) {
-            case ValueMetric::INCREASING:
-                if (value >= interval.base) {
-                    diff = value - interval.base;
-                } else if (mUseAbsoluteValueOnReset) {
-                    diff = value;
-                } else {
-                    VLOG("Unexpected decreasing value");
-                    StatsdStats::getInstance().notePullDataError(mPullTagId);
-                    interval.base = value;
-                    return;
-                }
-                break;
-            case ValueMetric::DECREASING:
-                if (interval.base >= value) {
-                    diff = interval.base - value;
-                } else if (mUseAbsoluteValueOnReset) {
-                    diff = value;
-                } else {
-                    VLOG("Unexpected increasing value");
-                    StatsdStats::getInstance().notePullDataError(mPullTagId);
-                    interval.base = value;
-                    return;
-                }
-                break;
-            case ValueMetric::ANY:
-                diff = value - interval.base;
-                break;
-            default:
-                break;
-        }
-        interval.base = value;
-        value = diff;
-    }
 
-    if (interval.hasValue) {
-        switch (mAggregationType) {
-            case ValueMetric::SUM:
-                // for AVG, we add up and take average when flushing the bucket
-            case ValueMetric::AVG:
-                interval.value += value;
-                break;
-            case ValueMetric::MIN:
-                interval.value = std::min(value, interval.value);
-                break;
-            case ValueMetric::MAX:
-                interval.value = std::max(value, interval.value);
-                break;
-            default:
-                break;
+        if (mUseDiff) {
+            // no base. just update base and return.
+            if (!interval.hasBase) {
+                interval.base = value;
+                interval.hasBase = true;
+                return;
+            }
+            Value diff;
+            switch (mValueDirection) {
+                case ValueMetric::INCREASING:
+                    if (value >= interval.base) {
+                        diff = value - interval.base;
+                    } else if (mUseAbsoluteValueOnReset) {
+                        diff = value;
+                    } else {
+                        VLOG("Unexpected decreasing value");
+                        StatsdStats::getInstance().notePullDataError(mPullTagId);
+                        interval.base = value;
+                        return;
+                    }
+                    break;
+                case ValueMetric::DECREASING:
+                    if (interval.base >= value) {
+                        diff = interval.base - value;
+                    } else if (mUseAbsoluteValueOnReset) {
+                        diff = value;
+                    } else {
+                        VLOG("Unexpected increasing value");
+                        StatsdStats::getInstance().notePullDataError(mPullTagId);
+                        interval.base = value;
+                        return;
+                    }
+                    break;
+                case ValueMetric::ANY:
+                    diff = value - interval.base;
+                    break;
+                default:
+                    break;
+            }
+            interval.base = value;
+            value = diff;
         }
-    } else {
-        interval.value = value;
-        interval.hasValue = true;
+
+        if (interval.hasValue) {
+            switch (mAggregationType) {
+                case ValueMetric::SUM:
+                    // for AVG, we add up and take average when flushing the bucket
+                case ValueMetric::AVG:
+                    interval.value += value;
+                    break;
+                case ValueMetric::MIN:
+                    interval.value = std::min(value, interval.value);
+                    break;
+                case ValueMetric::MAX:
+                    interval.value = std::max(value, interval.value);
+                    break;
+                default:
+                    break;
+            }
+        } else {
+            interval.value = value;
+            interval.hasValue = true;
+        }
+        interval.sampleSize += 1;
     }
-    interval.sampleSize += 1;
 
     // TODO: propgate proper values down stream when anomaly support doubles
-    long wholeBucketVal = interval.value.long_value;
+    long wholeBucketVal = multiIntervals[0].value.long_value;
     auto prev = mCurrentFullBucket.find(eventKey);
     if (prev != mCurrentFullBucket.end()) {
         wholeBucketVal += prev->second;
@@ -540,7 +566,9 @@
         VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
         // take base again in future good bucket.
         for (auto& slice : mCurrentSlicedBucket) {
-            slice.second.hasBase = false;
+            for (auto& interval : slice.second) {
+                interval.hasBase = false;
+            }
         }
     }
     VLOG("metric %lld: new bucket start time: %lld", (long long)mMetricId,
@@ -552,37 +580,38 @@
          (int)mCurrentSlicedBucket.size());
     int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
 
-    ValueBucket info;
-    info.mBucketStartNs = mCurrentBucketStartTimeNs;
-    if (eventTimeNs < fullBucketEndTimeNs) {
-        info.mBucketEndNs = eventTimeNs;
-    } else {
-        info.mBucketEndNs = fullBucketEndTimeNs;
-    }
+    int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
 
-    if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
+    if (bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
         // The current bucket is large enough to keep.
         for (const auto& slice : mCurrentSlicedBucket) {
-            if (slice.second.hasValue) {
-                // skip the output if the diff is zero
-                if (mSkipZeroDiffOutput && mUseDiff && slice.second.value.isZero()) {
-                    continue;
+            ValueBucket bucket;
+            bucket.mBucketStartNs = mCurrentBucketStartTimeNs;
+            bucket.mBucketEndNs = bucketEndTime;
+            for (const auto& interval : slice.second) {
+                if (interval.hasValue) {
+                    // skip the output if the diff is zero
+                    if (mSkipZeroDiffOutput && mUseDiff && interval.value.isZero()) {
+                        continue;
+                    }
+                    bucket.valueIndex.push_back(interval.valueIndex);
+                    if (mAggregationType != ValueMetric::AVG) {
+                        bucket.values.push_back(interval.value);
+                    } else {
+                        double sum = interval.value.type == LONG ? (double)interval.value.long_value
+                                                                 : interval.value.double_value;
+                        bucket.values.push_back(Value((double)sum / interval.sampleSize));
+                    }
                 }
-                if (mAggregationType != ValueMetric::AVG) {
-                    info.value = slice.second.value;
-                } else {
-                    double sum = slice.second.value.type == LONG
-                                         ? (double)slice.second.value.long_value
-                                         : slice.second.value.double_value;
-                    info.value.setDouble(sum / slice.second.sampleSize);
-                }
-                // it will auto create new vector of ValuebucketInfo if the key is not found.
+            }
+            // it will auto create new vector of ValuebucketInfo if the key is not found.
+            if (bucket.valueIndex.size() > 0) {
                 auto& bucketList = mPastBuckets[slice.first];
-                bucketList.push_back(info);
+                bucketList.push_back(bucket);
             }
         }
     } else {
-        mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
+        mSkippedBuckets.emplace_back(mCurrentBucketStartTimeNs, bucketEndTime);
     }
 
     if (eventTimeNs > fullBucketEndTimeNs) {  // If full bucket, send to anomaly tracker.
@@ -590,7 +619,7 @@
         if (mCurrentFullBucket.size() > 0) {
             for (const auto& slice : mCurrentSlicedBucket) {
                 // TODO: fix this when anomaly can accept double values
-                mCurrentFullBucket[slice.first] += slice.second.value.long_value;
+                mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
             }
             for (const auto& slice : mCurrentFullBucket) {
                 for (auto& tracker : mAnomalyTrackers) {
@@ -606,7 +635,7 @@
                 for (auto& tracker : mAnomalyTrackers) {
                     if (tracker != nullptr) {
                         // TODO: fix this when anomaly can accept double values
-                        tracker->addPastBucket(slice.first, slice.second.value.long_value,
+                        tracker->addPastBucket(slice.first, slice.second[0].value.long_value,
                                                mCurrentBucketNum);
                     }
                 }
@@ -616,14 +645,16 @@
         // Accumulate partial bucket.
         for (const auto& slice : mCurrentSlicedBucket) {
             // TODO: fix this when anomaly can accept double values
-            mCurrentFullBucket[slice.first] += slice.second.value.long_value;
+            mCurrentFullBucket[slice.first] += slice.second[0].value.long_value;
         }
     }
 
     // Reset counters
     for (auto& slice : mCurrentSlicedBucket) {
-        slice.second.hasValue = false;
-        slice.second.sampleSize = 0;
+        for (auto& interval : slice.second) {
+            interval.hasValue = false;
+            interval.sampleSize = 0;
+        }
     }
 }