blob: 361b161c76ac03c4220621ed30dc80910abe5921 [file] [log] [blame]
/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define DEBUG false // STOPSHIP if true
#include "Log.h"
#include "ShellSubscriber.h"
#include <android-base/file.h>
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
using android::util::ProtoOutputStream;
namespace android {
namespace os {
namespace statsd {
const static int FIELD_ID_ATOM = 1;
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
int myToken = claimToken();
VLOG("ShellSubscriber: new subscription %d has come in", myToken);
mSubscriptionShouldEnd.notify_one();
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
if (!readConfig(mySubscriptionInfo)) return;
{
std::unique_lock<std::mutex> lock(mMutex);
if (myToken != mToken) {
// Some other subscription has already come in. Stop.
return;
}
mSubscriptionInfo = mySubscriptionInfo;
spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
if (mSubscriptionInfo == mySubscriptionInfo) {
mSubscriptionInfo = nullptr;
}
}
}
void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
std::thread puller([this, myToken] { startPull(myToken); });
puller.detach();
}
std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
heartbeatSender.detach();
}
void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
int myToken,
std::unique_lock<std::mutex>& lock,
int timeoutSec) {
if (timeoutSec > 0) {
mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
return mToken != myToken || !myInfo->mClientAlive;
});
} else {
mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
return mToken != myToken || !myInfo->mClientAlive;
});
}
}
// Atomically claim the next token. Token numbers denote subscriber ordering.
int ShellSubscriber::claimToken() {
std::unique_lock<std::mutex> lock(mMutex);
int myToken = ++mToken;
return myToken;
}
// Read and parse single config. There should only one config per input.
bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
// Read the size of the config.
size_t bufferSize;
if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
return false;
}
// Read the config.
vector<uint8_t> buffer(bufferSize);
if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
return false;
}
// Parse the config.
ShellSubscription config;
if (!config.ParseFromArray(buffer.data(), bufferSize)) {
return false;
}
// Update SubscriptionInfo with state from config
for (const auto& pushed : config.pushed()) {
subscriptionInfo->mPushedMatchers.push_back(pushed);
}
int minInterval = -1;
for (const auto& pulled : config.pulled()) {
// All intervals need to be multiples of the min interval.
if (minInterval < 0 || pulled.freq_millis() < minInterval) {
minInterval = pulled.freq_millis();
}
vector<string> packages;
vector<int32_t> uids;
for (const string& pkg : pulled.packages()) {
auto it = UidMap::sAidToUidMapping.find(pkg);
if (it != UidMap::sAidToUidMapping.end()) {
uids.push_back(it->second);
} else {
packages.push_back(pkg);
}
}
subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis(), packages,
uids);
VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
}
subscriptionInfo->mPullIntervalMin = minInterval;
return true;
}
void ShellSubscriber::startPull(int myToken) {
VLOG("ShellSubscriber: pull thread %d starting", myToken);
while (true) {
{
std::lock_guard<std::mutex> lock(mMutex);
if (!mSubscriptionInfo || mToken != myToken) {
VLOG("ShellSubscriber: pulling thread %d done!", myToken);
return;
}
int64_t nowMillis = getElapsedRealtimeMillis();
int64_t nowNanos = getElapsedRealtimeNs();
for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
continue;
}
vector<int32_t> uids;
getUidsForPullAtom(&uids, pullInfo);
vector<std::shared_ptr<LogEvent>> data;
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, nowNanos, &data);
VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
}
VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
mSubscriptionInfo->mPullIntervalMin);
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
}
}
void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
// This is slow. Consider storing the uids per app and listening to uidmap updates.
for (const string& pkg : pullInfo.mPullPackages) {
set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
}
uids->push_back(DEFAULT_PULL_UID);
}
void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) {
mProto.clear();
int count = 0;
for (const auto& event : data) {
if (matchesSimple(*mUidMap, matcher, *event)) {
count++;
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event->ToProto(mProto);
mProto.end(atomToken);
}
}
if (count > 0) attemptWriteToSocketLocked(mProto.size());
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
std::lock_guard<std::mutex> lock(mMutex);
if (!mSubscriptionInfo) return;
mProto.clear();
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto);
mProto.end(atomToken);
attemptWriteToSocketLocked(mProto.size());
}
}
}
// Tries to write the atom encoded in mProto to the socket. If the write fails
// because the read end of the pipe has closed, signals to other threads that
// the subscription should end.
void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
// First write the payload size.
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
if (dataSize == 0) return;
// Then, write the payload.
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
mLastWriteMs = getElapsedRealtimeMillis();
}
// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
// recently received any writes from statsd. When it receives the data size of
// 0, perfd will not expect any data and recheck whether the shell command is
// still running.
void ShellSubscriber::sendHeartbeats(int myToken) {
while (true) {
{
std::lock_guard<std::mutex> lock(mMutex);
if (!mSubscriptionInfo || myToken != mToken) {
VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
return;
}
if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
VLOG("ShellSubscriber: sending a heartbeat to perfd");
attemptWriteToSocketLocked(/*dataSize=*/0);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
}
}
} // namespace statsd
} // namespace os
} // namespace android