blob: cb6166dafa3ae1daad871853e54d4b7b77b79927 [file] [log] [blame]
Chenjie Yub3dda412017-10-24 13:41:59 -07001/*
2 * Copyright (C) 2017 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define DEBUG true // STOPSHIP if true
18#include "Log.h"
19
20#include "ValueMetricProducer.h"
21
22#include <cutils/log.h>
23#include <limits.h>
24#include <stdlib.h>
25
26using std::map;
27using std::unordered_map;
28using std::list;
29using std::make_shared;
30using std::shared_ptr;
31using std::unique_ptr;
32
33namespace android {
34namespace os {
35namespace statsd {
36
37// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
38ValueMetricProducer::ValueMetricProducer(const ValueMetric& metric, const int conditionIndex,
39 const sp<ConditionWizard>& wizard)
40 : MetricProducer((time(nullptr) / 600 * 600 * NANO_SECONDS_IN_A_SECOND), conditionIndex,
41 wizard),
42 mMetric(metric),
43 mPullCode(mStatsPullerManager.GetPullCode(mMetric.what())) {
44 // TODO: valuemetric for pushed events may need unlimited bucket length
45 mBucketSizeNs = mMetric.bucket().bucket_size_millis() * 1000 * 1000;
46
47 mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end());
48
49 if (metric.links().size() > 0) {
50 mConditionLinks.insert(mConditionLinks.begin(), metric.links().begin(),
51 metric.links().end());
52 mConditionSliced = true;
53 }
54
55 if (!metric.has_condition() && mPullCode != -1) {
56 mStatsPullerManager.RegisterReceiver(mPullCode, this, metric.bucket().bucket_size_millis());
57 }
58
59 VLOG("value metric %lld created. bucket size %lld start_time: %lld", metric.metric_id(),
60 (long long)mBucketSizeNs, (long long)mStartTimeNs);
61}
62
63ValueMetricProducer::~ValueMetricProducer() {
64 VLOG("~ValueMetricProducer() called");
65}
66
67void ValueMetricProducer::finish() {
68 // TODO: write the StatsLogReport to dropbox using
69 // DropboxWriter.
70}
71
72static void addSlicedCounterToReport(StatsLogReport_ValueMetricDataWrapper& wrapper,
73 const vector<KeyValuePair>& key,
74 const vector<ValueBucketInfo>& buckets) {
75 ValueMetricData* data = wrapper.add_data();
76 for (const auto& kv : key) {
77 data->add_dimension()->CopyFrom(kv);
78 }
79 for (const auto& bucket : buckets) {
80 data->add_bucket_info()->CopyFrom(bucket);
81 VLOG("\t bucket [%lld - %lld] value: %lld", bucket.start_bucket_nanos(),
82 bucket.end_bucket_nanos(), bucket.value());
83 }
84}
85
86void ValueMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
87 VLOG("Metric %lld onSlicedConditionMayChange", mMetric.metric_id());
88}
89
90StatsLogReport ValueMetricProducer::onDumpReport() {
91 VLOG("metric %lld dump report now...", mMetric.metric_id());
92
93 StatsLogReport report;
94 report.set_metric_id(mMetric.metric_id());
95 report.set_start_report_nanos(mStartTimeNs);
96
97 // Dump current bucket if it's stale.
98 // If current bucket is still on-going, don't force dump current bucket.
99 // In finish(), We can force dump current bucket.
100 // flush_if_needed(time(nullptr) * NANO_SECONDS_IN_A_SECOND);
101 report.set_end_report_nanos(mCurrentBucketStartTimeNs);
102
103 StatsLogReport_ValueMetricDataWrapper* wrapper = report.mutable_value_metrics();
104
105 for (const auto& pair : mPastBuckets) {
106 const HashableDimensionKey& hashableKey = pair.first;
107 auto it = mDimensionKeyMap.find(hashableKey);
108 if (it == mDimensionKeyMap.end()) {
109 ALOGE("Dimension key %s not found?!?! skip...", hashableKey.c_str());
110 continue;
111 }
112
113 VLOG(" dimension key %s", hashableKey.c_str());
114 addSlicedCounterToReport(*wrapper, it->second, pair.second);
115 }
116 return report;
117 // TODO: Clear mPastBuckets, mDimensionKeyMap once the report is dumped.
118}
119
120void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
121 mCondition = condition;
122
123 if (mPullCode != -1) {
124 vector<shared_ptr<LogEvent>> allData = mStatsPullerManager.Pull(mPullCode, eventTime);
125 if (mCondition == true) {
126 mStatsPullerManager.RegisterReceiver(mPullCode, this,
127 mMetric.bucket().bucket_size_millis());
128 } else if (mCondition == ConditionState::kFalse) {
129 mStatsPullerManager.UnRegisterReceiver(mPullCode, this);
130 }
131 if (allData.size() == 0) {
132 return;
133 }
134 AutoMutex _l(mLock);
135 if (allData.size() == 0) {
136 return;
137 }
138 for (const auto& data : allData) {
139 onMatchedLogEvent(0, *data, false);
140 }
141 flush_if_needed(eventTime);
142 }
143 return;
144}
145
146void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
147 if (mCondition == ConditionState::kTrue || !mMetric.has_condition()) {
148 AutoMutex _l(mLock);
149 if (allData.size() == 0) {
150 return;
151 }
152 uint64_t eventTime = allData.at(0)->GetTimestampNs();
153 for (const auto& data : allData) {
154 onMatchedLogEvent(0, *data, true);
155 }
156 flush_if_needed(eventTime);
157 }
158}
159
160void ValueMetricProducer::onMatchedLogEventInternal(
161 const size_t matcherIndex, const HashableDimensionKey& eventKey,
162 const map<string, HashableDimensionKey>& conditionKey, bool condition,
163 const LogEvent& event, bool scheduledPull) {
164 uint64_t eventTimeNs = event.GetTimestampNs();
165 if (eventTimeNs < mCurrentBucketStartTimeNs) {
166 VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
167 (long long)mCurrentBucketStartTimeNs);
168 return;
169 }
170
171 Interval& interval = mCurrentSlicedBucket[eventKey];
172
173 long value = get_value(event);
174
175 if (scheduledPull) {
176 if (interval.raw.size() > 0) {
177 interval.raw.back().second = value;
178 } else {
179 interval.raw.push_back(std::make_pair(value, value));
180 }
181 mNextSlicedBucket[eventKey].raw[0].first = value;
182 } else {
183 if (mCondition == ConditionState::kTrue) {
184 interval.raw.push_back(std::make_pair(value, 0));
185 } else {
186 if (interval.raw.size() != 0) {
187 interval.raw.back().second = value;
188 }
189 }
190 }
191 if (mPullCode == -1) {
192 flush_if_needed(eventTimeNs);
193 }
194}
195
196long ValueMetricProducer::get_value(const LogEvent& event) {
197 status_t err = NO_ERROR;
198 long val = event.GetLong(mMetric.value_field(), &err);
199 if (err == NO_ERROR) {
200 return val;
201 } else {
202 VLOG("Can't find value in message.");
203 return 0;
204 }
205}
206
207void ValueMetricProducer::flush_if_needed(const uint64_t eventTimeNs) {
208 if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
209 VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
210 (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
211 return;
212 }
213
214 VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
215 (int)mCurrentSlicedBucket.size());
216 ValueBucketInfo info;
217 info.set_start_bucket_nanos(mCurrentBucketStartTimeNs);
218 info.set_end_bucket_nanos(mCurrentBucketStartTimeNs + mBucketSizeNs);
219
220 for (const auto& slice : mCurrentSlicedBucket) {
221 long value = 0;
222 for (const auto& pair : slice.second.raw) {
223 value += pair.second - pair.first;
224 }
225 info.set_value(value);
226 VLOG(" %s, %ld", slice.first.c_str(), value);
227 // it will auto create new vector of ValuebucketInfo if the key is not found.
228 auto& bucketList = mPastBuckets[slice.first];
229 bucketList.push_back(info);
230 }
231
232 // Reset counters
233 mCurrentSlicedBucket.swap(mNextSlicedBucket);
234 mNextSlicedBucket.clear();
235 int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
236 if (numBucketsForward >1) {
237 VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
238 }
239 mCurrentBucketStartTimeNs = mCurrentBucketStartTimeNs + numBucketsForward * mBucketSizeNs;
240 VLOG("metric %lld: new bucket start time: %lld", mMetric.metric_id(),
241 (long long)mCurrentBucketStartTimeNs);
242}
243
244} // namespace statsd
245} // namespace os
246} // namespace android