blob: 110e4299a9e079d9dde16d934c95fae3c68fbbc9 [file] [log] [blame]
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {defer, Deferred} from '../base/deferred';
import {assertExists} from '../base/logging';
import {perfetto} from '../gen/protos';
import {ProtoRingBuffer} from './proto_ring_buffer';
import {
ComputeMetricArgs,
ComputeMetricResult,
RawQueryArgs,
RawQueryResult
} from './protos';
import {iter, NUM_NULL, slowlyCountRows, STR} from './query_iterator';
import {TimeSpan} from './time';
import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc;
import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream;
import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod;
export interface LoadingTracker {
beginLoading(): void;
endLoading(): void;
}
export class NullLoadingTracker implements LoadingTracker {
beginLoading(): void {}
endLoading(): void {}
}
export class QueryError extends Error {}
/**
* Abstract interface of a trace proccessor.
* This is the TypeScript equivalent of src/trace_processor/rpc.h.
* There are two concrete implementations:
* 1. WasmEngineProxy: creates a Wasm module and interacts over postMessage().
* 2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`.
* and interacts via fetch().
* In both cases, we have a byte-oriented pipe to interact with TraceProcessor.
* The derived class is only expected to deal with these two functions:
* 1. Implement the abstract rpcSendRequestBytes() function, sending the
* proto-encoded TraceProcessorRpc requests to the TraceProcessor instance.
* 2. Call onRpcResponseBytes() when response data is received.
*/
export abstract class Engine {
abstract readonly id: string;
private _cpus?: number[];
private _numGpus?: number;
private loadingTracker: LoadingTracker;
private txSeqId = 0;
private rxSeqId = 0;
private rxBuf = new ProtoRingBuffer();
private pendingParses = new Array<Deferred<void>>();
private pendingEOFs = new Array<Deferred<void>>();
private pendingRawQueries = new Array<Deferred<RawQueryResult>>();
private pendingRestoreTables = new Array<Deferred<void>>();
private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>();
constructor(tracker?: LoadingTracker) {
this.loadingTracker = tracker ? tracker : new NullLoadingTracker();
}
/**
* Called to send data to the TraceProcessor instance. This turns into a
* postMessage() or a HTTP request, depending on the Engine implementation.
*/
abstract rpcSendRequestBytes(data: Uint8Array): void;
/**
* Called when an inbound message is received by the Engine implementation
* (e.g. onmessage for the Wasm case, on when HTTP replies are received for
* the HTTP+RPC case).
*/
onRpcResponseBytes(dataWillBeRetained: Uint8Array) {
// Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer
// is returned back by readMessage() (% subarray()-ing it) and held onto by
// other classes (e.g., QueryResult). For both fetch() and Wasm we are fine
// because every response creates a new buffer.
this.rxBuf.append(dataWillBeRetained);
for (;;) {
const msg = this.rxBuf.readMessage();
if (msg === undefined) break;
this.onRpcResponseMessage(msg);
}
}
/*
* Parses a response message.
* |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc
* proto-encoded message (without the proto preamble and varint size).
*/
private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) {
const rpc = TraceProcessorRpc.decode(rpcMsgEncoded);
this.loadingTracker.endLoading();
if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) {
throw new Error(`${rpc.fatalError}`);
}
// Allow restarting sequences from zero (when reloading the browser).
if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) {
// "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more
// graceful and actionable error.
throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${
this.rxSeqId} (ERR:rpc_seq)`);
}
this.rxSeqId = rpc.seq;
switch (rpc.response) {
case TPM.TPM_APPEND_TRACE_DATA:
const appendResult = assertExists(rpc.appendResult);
const pendingPromise = assertExists(this.pendingParses.shift());
if (appendResult.error && appendResult.error.length > 0) {
pendingPromise.reject(appendResult.error);
} else {
pendingPromise.resolve();
}
break;
case TPM.TPM_FINALIZE_TRACE_DATA:
assertExists(this.pendingEOFs.shift()).resolve();
break;
case TPM.TPM_RESTORE_INITIAL_TABLES:
assertExists(this.pendingRestoreTables.shift()).resolve();
break;
case TPM.TPM_QUERY_STREAMING:
// TODO(primiano): In the next CLs wire up the streaming query decoder.
break;
case TPM.TPM_QUERY_RAW_DEPRECATED:
const queryRes = assertExists(rpc.rawQueryResult) as RawQueryResult;
assertExists(this.pendingRawQueries.shift()).resolve(queryRes);
break;
case TPM.TPM_COMPUTE_METRIC:
const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult;
if (metricRes.error && metricRes.error.length > 0) {
throw new QueryError(`ComputeMetric() error: ${metricRes.error}`);
}
assertExists(this.pendingComputeMetrics.shift()).resolve(metricRes);
break;
default:
console.log(
'Unexpected TraceProcessor response received: ', rpc.response);
break;
} // switch(rpc.response);
}
/**
* TraceProcessor methods below this point.
* The methods below are called by the various controllers in the UI and
* deal with marshalling / unmarshaling requests to/from TraceProcessor.
*/
/**
* Push trace data into the engine. The engine is supposed to automatically
* figure out the type of the trace (JSON vs Protobuf).
*/
parse(data: Uint8Array): Promise<void> {
const asyncRes = defer<void>();
this.pendingParses.push(asyncRes);
const rpc = TraceProcessorRpc.create();
rpc.request = TPM.TPM_APPEND_TRACE_DATA;
rpc.appendTraceData = data;
this.rpcSendRequest(rpc);
return asyncRes; // Linearize with the worker.
}
/**
* Notify the engine that we reached the end of the trace.
* Called after the last parse() call.
*/
notifyEof(): Promise<void> {
const asyncRes = defer<void>();
this.pendingEOFs.push(asyncRes);
const rpc = TraceProcessorRpc.create();
rpc.request = TPM.TPM_FINALIZE_TRACE_DATA;
this.rpcSendRequest(rpc);
return asyncRes; // Linearize with the worker.
}
/**
* Resets the trace processor state by destroying any table/views created by
* the UI after loading.
*/
restoreInitialTables(): Promise<void> {
const asyncRes = defer<void>();
this.pendingRestoreTables.push(asyncRes);
const rpc = TraceProcessorRpc.create();
rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES;
this.rpcSendRequest(rpc);
return asyncRes; // Linearize with the worker.
}
/**
* Shorthand for sending a compute metrics request to the engine.
*/
async computeMetric(metrics: string[]): Promise<ComputeMetricResult> {
const asyncRes = defer<ComputeMetricResult>();
this.pendingComputeMetrics.push(asyncRes);
const rpc = TraceProcessorRpc.create();
rpc.request = TPM.TPM_COMPUTE_METRIC;
const args = rpc.computeMetricArgs = new ComputeMetricArgs();
args.metricNames = metrics;
args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
this.rpcSendRequest(rpc);
return asyncRes;
}
/**
* Runs a SQL query and throws if the query failed.
* Queries performed by the controller logic should use this.
*/
async query(sqlQuery: string): Promise<RawQueryResult> {
const result = await this.uncheckedQuery(sqlQuery);
if (result.error) {
throw new QueryError(`Query error "${sqlQuery}": ${result.error}`);
}
return result;
}
/**
* Runs a SQL query. Does not throw if the query fails.
* The caller must handle this failure. This is so this function can be safely
* used for user-typed SQL.
*/
uncheckedQuery(sqlQuery: string): Promise<RawQueryResult> {
const asyncRes = defer<RawQueryResult>();
this.pendingRawQueries.push(asyncRes);
const rpc = TraceProcessorRpc.create();
rpc.request = TPM.TPM_QUERY_RAW_DEPRECATED;
rpc.rawQueryArgs = new RawQueryArgs();
rpc.rawQueryArgs.sqlQuery = sqlQuery;
rpc.rawQueryArgs.timeQueuedNs = Math.floor(performance.now() * 1e6);
this.rpcSendRequest(rpc);
return asyncRes;
}
async queryOneRow(query: string): Promise<number[]> {
const result = await this.query(query);
const res: number[] = [];
if (slowlyCountRows(result) === 0) return res;
for (const col of result.columns) {
if (col.longValues!.length === 0) {
console.error(
`queryOneRow should only be used for queries that return long values
: ${query}`);
throw new Error(
`queryOneRow should only be used for queries that return long values
: ${query}`);
}
res.push(+col.longValues![0]);
}
return res;
}
/**
* Marshals the TraceProcessorRpc request arguments and sends the request
* to the concrete Engine (Wasm or HTTP).
*/
private rpcSendRequest(rpc: TraceProcessorRpc) {
rpc.seq = this.txSeqId++;
// Each message is wrapped in a TraceProcessorRpcStream to add the varint
// preamble with the size, which allows tokenization on the other end.
const outerProto = TraceProcessorRpcStream.create();
outerProto.msg.push(rpc);
const buf = TraceProcessorRpcStream.encode(outerProto).finish();
this.loadingTracker.beginLoading();
this.rpcSendRequestBytes(buf);
}
// TODO(hjd): When streaming must invalidate this somehow.
async getCpus(): Promise<number[]> {
if (!this._cpus) {
const result =
await this.query('select distinct(cpu) from sched order by cpu;');
if (slowlyCountRows(result) === 0) return [];
this._cpus = result.columns[0].longValues!.map(n => +n);
}
return this._cpus;
}
async getNumberOfGpus(): Promise<number> {
if (!this._numGpus) {
const result = await this.query(`
select count(distinct(gpu_id)) as gpuCount
from gpu_counter_track
where name = 'gpufreq';
`);
this._numGpus = +result.columns[0].longValues![0];
}
return this._numGpus;
}
// TODO: This should live in code that's more specific to chrome, instead of
// in engine.
async getNumberOfProcesses(): Promise<number> {
const result = await this.query('select count(*) from process;');
return +result.columns[0].longValues![0];
}
async getTraceTimeBounds(): Promise<TimeSpan> {
const query = `select start_ts, end_ts from trace_bounds`;
const res = (await this.queryOneRow(query));
return new TimeSpan(res[0] / 1e9, res[1] / 1e9);
}
async getTracingMetadataTimeBounds(): Promise<TimeSpan> {
const query = await this.query(`select name, int_value from metadata
where name = 'tracing_started_ns' or name = 'tracing_disabled_ns'
or name = 'all_data_source_started_ns'`);
let startBound = -Infinity;
let endBound = Infinity;
const it = iter({'name': STR, 'int_value': NUM_NULL}, query);
for (; it.valid(); it.next()) {
const columnName = it.row.name;
const timestamp = it.row.int_value;
if (timestamp === null) continue;
if (columnName === 'tracing_disabled_ns') {
endBound = Math.min(endBound, timestamp / 1e9);
} else {
startBound = Math.max(startBound, timestamp / 1e9);
}
}
return new TimeSpan(startBound, endBound);
}
}