blob: be4429750fb928430b9862fa4829efce68f40b63 [file] [log] [blame]
/*
* Copyright (C) 2020 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "src/trace_processor/rpc/query_result_serializer.h"
#include <vector>
#include "perfetto/protozero/packed_repeated_fields.h"
#include "perfetto/protozero/proto_utils.h"
#include "protos/perfetto/trace_processor/trace_processor.pbzero.h"
#include "src/trace_processor/iterator_impl.h"
namespace perfetto {
namespace trace_processor {
namespace {
namespace pu = ::protozero::proto_utils;
using BatchProto = protos::pbzero::QueryResult::CellsBatch;
using ResultProto = protos::pbzero::QueryResult;
// The reserved field in trace_processor.proto.
static constexpr uint32_t kPaddingFieldId = 7;
uint8_t MakeLenDelimTag(uint32_t field_num) {
uint32_t tag = pu::MakeTagLengthDelimited(field_num);
PERFETTO_DCHECK(tag <= 127); // Must fit in one byte.
return static_cast<uint8_t>(tag);
}
} // namespace
QueryResultSerializer::QueryResultSerializer(Iterator iter)
: iter_(iter.take_impl()), num_cols_(iter_->ColumnCount()) {}
QueryResultSerializer::~QueryResultSerializer() = default;
bool QueryResultSerializer::Serialize(std::vector<uint8_t>* buf) {
PERFETTO_CHECK(!eof_reached_);
// In non-production builds avoid the big reservation. This is to avoid hiding
// bugs that accidentally depend on pointer stability across resizes.
#if !PERFETTO_DCHECK_IS_ON()
buf->reserve(buf->size() + batch_split_threshold_ + 4096);
#endif
if (!did_write_column_names_) {
SerializeColumnNames(buf);
did_write_column_names_ = true;
}
// In case of an error we still want to go through SerializeBatch(). That will
// write an empty batch with the EOF marker. Errors can happen also in the
// middle of a query, not just before starting it.
SerializeBatch(buf);
MaybeSerializeError(buf);
return !eof_reached_;
}
void QueryResultSerializer::SerializeBatch(std::vector<uint8_t>* buf) {
// The buffer is filled in this way:
// - Append all the strings as we iterate through the results. The rationale
// is that strings are typically the largest part of the result and we want
// to avoid copying these.
// - While iterating, buffer all other types of cells. They will be appended
// at the end of the batch, after the string payload is known.
// Note: this function uses uint32_t instead of size_t because Wasm doesn't
// have yet native 64-bit integers and this is perf-sensitive.
const uint32_t initial_size = static_cast<uint32_t>(buf->size());
buf->push_back(MakeLenDelimTag(ResultProto::kBatchFieldNumber));
const uint32_t batch_size_hdr = static_cast<uint32_t>(buf->size());
buf->resize(batch_size_hdr + pu::kMessageLengthFieldSize);
// Start the |string_cells|.
buf->push_back(MakeLenDelimTag(BatchProto::kStringCellsFieldNumber));
const uint32_t strings_hdr_off = static_cast<uint32_t>(buf->size());
buf->resize(strings_hdr_off + pu::kMessageLengthFieldSize);
const uint32_t strings_start_off = static_cast<uint32_t>(buf->size());
// This keeps track of the overall size of the batch. It is used to decide if
// we need to prematurely end the batch, even if the batch_split_threshold_ is
// not reached. This is to guard against the degenerate case of appending a
// lot of very large strings and ending up with an enormous batch.
auto approx_batch_size = static_cast<uint32_t>(buf->size()) - initial_size;
std::vector<uint8_t> cell_types(cells_per_batch_);
// Varints and doubles are written on stack-based storage and appended later.
protozero::PackedVarInt varints;
protozero::PackedFixedSizeInt<double> doubles;
// We write blobs on a temporary heap buffer and append it at the end. Blobs
// are extremely rare, trying to avoid copies is not worth the complexity.
std::vector<uint8_t> blobs;
uint32_t cell_idx = 0;
bool batch_full = false;
// Skip block if the query didn't return any result (e.g. CREATE TABLE).
while (num_cols_ > 0) {
// Next needs to be called before iterating on a row. col_ is initialized
// at MAX_INT in the constructor, so in the very first iteration this causes
// a Next() call.
if (col_ >= num_cols_) {
col_ = 0;
if (!iter_->Next())
break; // EOF or error.
}
auto value = iter_->Get(col_);
uint8_t cell_type = BatchProto::CELL_INVALID;
switch (value.type) {
case SqlValue::Type::kNull: {
cell_type = BatchProto::CELL_NULL;
break;
}
case SqlValue::Type::kLong: {
cell_type = BatchProto::CELL_VARINT;
varints.Append(value.long_value);
approx_batch_size += 4; // Just a guess, doesn't need to be accurate.
break;
}
case SqlValue::Type::kDouble: {
cell_type = BatchProto::CELL_FLOAT64;
approx_batch_size += sizeof(double);
doubles.Append(value.double_value);
break;
}
case SqlValue::Type::kString: {
// Append the string to the one |string_cells| proto field, just use
// \0 to separate each string. We are deliberately NOT emitting one
// proto repeated field for each string. Doing so significantly slows
// down parsing on the JS side (go/postmessage-benchmark).
cell_type = BatchProto::CELL_STRING;
uint32_t len_with_nul =
static_cast<uint32_t>(strlen(value.string_value)) + 1;
const char* str_begin = value.string_value;
buf->insert(buf->end(), str_begin, str_begin + len_with_nul);
approx_batch_size += len_with_nul;
break;
}
case SqlValue::Type::kBytes: {
// Each blob is stored as its own repeated proto field, unlike strings.
// Blobs don't incur in text-decoding overhead (and are also rare).
cell_type = BatchProto::CELL_BLOB;
auto* src = static_cast<const uint8_t*>(value.bytes_value);
uint32_t len = static_cast<uint32_t>(value.bytes_count);
uint8_t preamble[16];
uint8_t* preamble_end = &preamble[0];
*(preamble_end++) = MakeLenDelimTag(BatchProto::kBlobCellsFieldNumber);
preamble_end = pu::WriteVarInt(len, preamble_end);
blobs.insert(blobs.end(), preamble, preamble_end);
blobs.insert(blobs.end(), src, src + len);
approx_batch_size += len + 4; // 4 is a guess on the preamble size.
break;
}
}
PERFETTO_DCHECK(cell_type != BatchProto::CELL_INVALID);
cell_types[cell_idx] = cell_type;
// The wrapping + iter_->Next() is done in the beginning to deal with
// the very first Next() call in one place.
++cell_idx;
++col_;
if (cell_idx >= cells_per_batch_ ||
approx_batch_size > batch_split_threshold_) {
batch_full = true;
break;
}
} // for (cell)
// Backfill the string size.
auto strings_size = static_cast<uint32_t>(buf->size() - strings_start_off);
pu::WriteRedundantVarInt(strings_size, buf->data() + strings_hdr_off);
// Write the cells headers (1 byte per cell).
{
uint8_t preamble[16];
uint8_t* preamble_end = &preamble[0];
*(preamble_end++) = MakeLenDelimTag(BatchProto::kCellsFieldNumber);
preamble_end = pu::WriteVarInt(cell_idx, preamble_end);
buf->insert(buf->end(), preamble, preamble_end);
buf->insert(buf->end(), cell_types.data(), cell_types.data() + cell_idx);
}
// Append the |varint_cells|, copying over the packed varint buffer.
const uint32_t varints_size = static_cast<uint32_t>(varints.size());
if (varints_size > 0) {
uint8_t preamble[16];
uint8_t* preamble_end = &preamble[0];
*(preamble_end++) = MakeLenDelimTag(BatchProto::kVarintCellsFieldNumber);
preamble_end = pu::WriteVarInt(varints_size, preamble_end);
buf->insert(buf->end(), preamble, preamble_end);
buf->insert(buf->end(), varints.data(), varints.data() + varints_size);
}
// Append the |float64_cells|, copying over the packed fixed64 buffer. This is
// appended at a 64-bit aligned offset, so that JS can access these by overlay
// a TypedArray, without extra copies.
const uint32_t doubles_size = static_cast<uint32_t>(doubles.size());
if (doubles_size > 0) {
uint8_t preamble[16];
uint8_t* preamble_end = &preamble[0];
*(preamble_end++) = MakeLenDelimTag(BatchProto::kFloat64CellsFieldNumber);
preamble_end = pu::WriteVarInt(doubles_size, preamble_end);
uint32_t preamble_size = static_cast<uint32_t>(preamble_end - &preamble[0]);
// The byte after the preamble must start at a 64bit-aligned offset.
// The padding needs to be > 1 Byte because of proto encoding.
const uint32_t off = static_cast<uint32_t>(buf->size() + preamble_size);
const uint32_t aligned_off = (off + 7) & ~7u;
uint32_t padding = aligned_off - off;
if (padding == 1)
padding = 9;
if (padding > 0) {
buf->push_back(pu::MakeTagVarInt(kPaddingFieldId));
for (uint32_t i = 0; i < padding - 2; i++)
buf->push_back(0x80);
buf->push_back(0);
}
buf->insert(buf->end(), preamble, preamble_end);
PERFETTO_CHECK(buf->size() % 8 == 0);
buf->insert(buf->end(), doubles.data(), doubles.data() + doubles_size);
} // if (doubles_size > 0)
// Append the blobs.
buf->insert(buf->end(), blobs.begin(), blobs.end());
// If this is the last batch, write the EOF field.
if (!batch_full) {
eof_reached_ = true;
auto kEofTag = pu::MakeTagVarInt(BatchProto::kIsLastBatchFieldNumber);
buf->push_back(static_cast<uint8_t>(kEofTag));
buf->push_back(1);
}
// Finally backfill the size of the whole |batch| sub-message.
const uint32_t batch_size = static_cast<uint32_t>(
buf->size() - batch_size_hdr - pu::kMessageLengthFieldSize);
pu::WriteRedundantVarInt(batch_size, buf->data() + batch_size_hdr);
}
void QueryResultSerializer::MaybeSerializeError(std::vector<uint8_t>* buf) {
if (iter_->Status().ok())
return;
std::string err = iter_->Status().message();
// Make sure the |error| field is always non-zero if the query failed, so
// the client can tell some error happened.
if (err.empty())
err = "Unknown error";
// Write the error and return.
uint8_t preamble[16];
uint8_t* preamble_end = &preamble[0];
*(preamble_end++) = MakeLenDelimTag(ResultProto::kErrorFieldNumber);
preamble_end = pu::WriteVarInt(err.size(), preamble_end);
buf->insert(buf->end(), preamble, preamble_end);
buf->insert(buf->end(), err.begin(), err.end());
}
void QueryResultSerializer::SerializeColumnNames(std::vector<uint8_t>* buf) {
PERFETTO_DCHECK(!did_write_column_names_);
for (uint32_t c = 0; c < num_cols_; c++) {
std::string col_name = iter_->GetColumnName(c);
uint8_t preamble[16];
uint8_t* preamble_end = &preamble[0];
*(preamble_end++) = MakeLenDelimTag(ResultProto::kColumnNamesFieldNumber);
preamble_end = pu::WriteVarInt(col_name.size(), preamble_end);
buf->insert(buf->end(), preamble, preamble_end);
buf->insert(buf->end(), col_name.begin(), col_name.end());
}
}
} // namespace trace_processor
} // namespace perfetto