Annotate trace packets with a trusted producer uid

This patch adds a trusted uid field to the trace packet message, which
identifies the POSIX user account which submitted the associated trace
data to the tracing service. The field is created by the service in a
way that makes it unspoofable. Later we will also include a mapping
from package names to uids in the trace.

Bug: 69964495,73283884
Change-Id: I8fdacdd00ab8efd5d5bca7d4b164dd3c4c7f0741
diff --git a/Android.bp b/Android.bp
index e4e538a..71abf8e 100644
--- a/Android.bp
+++ b/Android.bp
@@ -54,6 +54,7 @@
     "src/tracing/core/data_source_config.cc",
     "src/tracing/core/data_source_descriptor.cc",
     "src/tracing/core/id_allocator.cc",
+    "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/service_impl.cc",
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
@@ -119,6 +120,7 @@
     "src/tracing/core/data_source_config.cc",
     "src/tracing/core/data_source_descriptor.cc",
     "src/tracing/core/id_allocator.cc",
+    "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/service_impl.cc",
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
@@ -224,6 +226,7 @@
     "src/tracing/core/data_source_config.cc",
     "src/tracing/core/data_source_descriptor.cc",
     "src/tracing/core/id_allocator.cc",
+    "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/service_impl.cc",
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
@@ -1339,6 +1342,7 @@
     "protos/perfetto/trace/test_event.proto",
     "protos/perfetto/trace/trace.proto",
     "protos/perfetto/trace/trace_packet.proto",
+    "protos/perfetto/trace/trusted_packet.proto",
   ],
   tools: [
     "aprotoc",
@@ -1348,6 +1352,7 @@
     "external/perfetto/protos/perfetto/trace/test_event.pb.cc",
     "external/perfetto/protos/perfetto/trace/trace.pb.cc",
     "external/perfetto/protos/perfetto/trace/trace_packet.pb.cc",
+    "external/perfetto/protos/perfetto/trace/trusted_packet.pb.cc",
   ],
 }
 
@@ -1358,6 +1363,7 @@
     "protos/perfetto/trace/test_event.proto",
     "protos/perfetto/trace/trace.proto",
     "protos/perfetto/trace/trace_packet.proto",
+    "protos/perfetto/trace/trusted_packet.proto",
   ],
   tools: [
     "aprotoc",
@@ -1367,6 +1373,7 @@
     "external/perfetto/protos/perfetto/trace/test_event.pb.h",
     "external/perfetto/protos/perfetto/trace/trace.pb.h",
     "external/perfetto/protos/perfetto/trace/trace_packet.pb.h",
+    "external/perfetto/protos/perfetto/trace/trusted_packet.pb.h",
   ],
   export_include_dirs: [
     "protos",
@@ -1380,6 +1387,7 @@
     "protos/perfetto/trace/test_event.proto",
     "protos/perfetto/trace/trace.proto",
     "protos/perfetto/trace/trace_packet.proto",
+    "protos/perfetto/trace/trusted_packet.proto",
   ],
   tools: [
     "aprotoc",
@@ -1390,6 +1398,7 @@
     "external/perfetto/protos/perfetto/trace/test_event.pbzero.cc",
     "external/perfetto/protos/perfetto/trace/trace.pbzero.cc",
     "external/perfetto/protos/perfetto/trace/trace_packet.pbzero.cc",
+    "external/perfetto/protos/perfetto/trace/trusted_packet.pbzero.cc",
   ],
 }
 
@@ -1400,6 +1409,7 @@
     "protos/perfetto/trace/test_event.proto",
     "protos/perfetto/trace/trace.proto",
     "protos/perfetto/trace/trace_packet.proto",
+    "protos/perfetto/trace/trusted_packet.proto",
   ],
   tools: [
     "aprotoc",
@@ -1410,6 +1420,7 @@
     "external/perfetto/protos/perfetto/trace/test_event.pbzero.h",
     "external/perfetto/protos/perfetto/trace/trace.pbzero.h",
     "external/perfetto/protos/perfetto/trace/trace_packet.pbzero.h",
+    "external/perfetto/protos/perfetto/trace/trusted_packet.pbzero.h",
   ],
   export_include_dirs: [
     "protos",
@@ -1725,6 +1736,7 @@
     "src/tracing/core/data_source_config.cc",
     "src/tracing/core/data_source_descriptor.cc",
     "src/tracing/core/id_allocator.cc",
+    "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/service_impl.cc",
     "src/tracing/core/shared_memory_abi.cc",
     "src/tracing/core/shared_memory_arbiter_impl.cc",
@@ -1850,6 +1862,8 @@
     "src/tracing/core/data_source_descriptor.cc",
     "src/tracing/core/id_allocator.cc",
     "src/tracing/core/id_allocator_unittest.cc",
+    "src/tracing/core/packet_stream_validator.cc",
+    "src/tracing/core/packet_stream_validator_unittest.cc",
     "src/tracing/core/service_impl.cc",
     "src/tracing/core/service_impl_unittest.cc",
     "src/tracing/core/shared_memory_abi.cc",
diff --git a/include/perfetto/ipc/client_info.h b/include/perfetto/ipc/client_info.h
index 4561f18..636bd76 100644
--- a/include/perfetto/ipc/client_info.h
+++ b/include/perfetto/ipc/client_info.h
@@ -17,6 +17,8 @@
 #ifndef INCLUDE_PERFETTO_IPC_CLIENT_INFO_H_
 #define INCLUDE_PERFETTO_IPC_CLIENT_INFO_H_
 
+#include <unistd.h>
+
 #include "perfetto/base/logging.h"
 #include "perfetto/ipc/basic_types.h"
 
@@ -27,7 +29,8 @@
 class ClientInfo {
  public:
   ClientInfo() = default;
-  ClientInfo(ClientID client_id, int uid) : client_id_(client_id), uid_(uid) {}
+  ClientInfo(ClientID client_id, uid_t uid)
+      : client_id_(client_id), uid_(uid) {}
 
   bool operator==(const ClientInfo& other) const {
     return (client_id_ == other.client_id_ && uid_ == other.uid_);
@@ -46,11 +49,11 @@
   ClientID client_id() const { return client_id_; }
 
   // Posix User ID. Comes from the kernel, can be trusted.
-  int uid() const { return uid_; }
+  uid_t uid() const { return uid_; }
 
  private:
   ClientID client_id_ = 0;
-  int uid_ = -1;
+  uid_t uid_ = -1;
 };
 
 }  // namespace ipc
diff --git a/include/perfetto/tracing/core/chunk.h b/include/perfetto/tracing/core/chunk.h
index 139b567..b76e08b 100644
--- a/include/perfetto/tracing/core/chunk.h
+++ b/include/perfetto/tracing/core/chunk.h
@@ -19,6 +19,7 @@
 
 #include <stddef.h>
 
+#include <memory>
 #include <vector>
 
 namespace perfetto {
@@ -26,9 +27,28 @@
 // A simple wrapper around a virtually contiguous memory range that contains a
 // TracePacket, or just a portion of it.
 struct Chunk {
+  Chunk() : start(nullptr), size(0) {}
   Chunk(const void* st, size_t sz) : start(st), size(sz) {}
+  Chunk(Chunk&& other) noexcept = default;
+
+  // Create a Chunk which contains (and owns) a copy of the given memory.
+  static Chunk Copy(const void* start, size_t size) {
+    Chunk c;
+    c.own_data_.reset(new uint8_t[size]);
+    c.size = size;
+    c.start = &c.own_data_[0];
+    memcpy(&c.own_data_[0], start, size);
+    return c;
+  }
+
   const void* start;
   size_t size;
+
+ private:
+  Chunk(const Chunk&) = delete;
+  void operator=(const Chunk&) = delete;
+
+  std::unique_ptr<uint8_t[]> own_data_;
 };
 
 // TODO(primiano): most TracePacket(s) fit in a chunk or two. We need something
diff --git a/include/perfetto/tracing/core/service.h b/include/perfetto/tracing/core/service.h
index 1acb2b3..d39641a 100644
--- a/include/perfetto/tracing/core/service.h
+++ b/include/perfetto/tracing/core/service.h
@@ -127,11 +127,14 @@
   // as the returned ProducerEndpoint is alive.
   // To disconnect just destroy the returned ProducerEndpoint object. It is safe
   // to destroy the Producer once the Producer::OnDisconnect() has been invoked.
+  // |uid| is the trusted user id of the producer process, used by the consumers
+  // for validating the origin of trace data.
   // |shared_buffer_size_hint_bytes| is an optional hint on the size of the
   // shared memory buffer. The service can ignore the hint (e.g., if the hint
   // is unreasonably large).
   virtual std::unique_ptr<ProducerEndpoint> ConnectProducer(
       Producer*,
+      uid_t uid,
       size_t shared_buffer_size_hint_bytes = 0) = 0;
 
   // Coonects a Consumer instance and obtains a ConsumerEndpoint, which is
diff --git a/protos/perfetto/trace/BUILD.gn b/protos/perfetto/trace/BUILD.gn
index 2d9ec8d..1a9d6f2 100644
--- a/protos/perfetto/trace/BUILD.gn
+++ b/protos/perfetto/trace/BUILD.gn
@@ -20,6 +20,7 @@
   "test_event.proto",
   "trace_packet.proto",
   "trace.proto",
+  "trusted_packet.proto",
 ]
 
 # Protozero generated stubs, for writers.
diff --git a/protos/perfetto/trace/trace_packet.proto b/protos/perfetto/trace/trace_packet.proto
index 7fc861d..635255a 100644
--- a/protos/perfetto/trace/trace_packet.proto
+++ b/protos/perfetto/trace/trace_packet.proto
@@ -27,7 +27,11 @@
 message TracePacket {
   oneof data {
     FtraceEventBundle ftrace_events = 1;
-    TestEvent test_event = 536870911;  // 2^29 - 1, max field id for protos.
+
+    // This field is only used for testing.
+    TestEvent for_testing = 536870911;  // 2^29 - 1, max field id for protos.
   }
-  optional string test = 2;
+  // Trusted user id of the producer which generated this packet. Keep in sync
+  // with TrustedPacket.trusted_uid.
+  oneof optional_trusted_uid { int32 trusted_uid = 3; };
 }
diff --git a/protos/perfetto/trace/trusted_packet.proto b/protos/perfetto/trace/trusted_packet.proto
new file mode 100644
index 0000000..9b8b417
--- /dev/null
+++ b/protos/perfetto/trace/trusted_packet.proto
@@ -0,0 +1,37 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Use proto3 syntax as an optimization. The difference is that proto2 stores
+// unknown fields seen while decoding in an internal buffer (std::string) while
+// proto3 completely drops them. Since during validation we only need to check
+// for the presence of the trusted fields below, we can use proto3 as a way to
+// speed up this process.
+//
+// See https://developers.google.com/protocol-buffers/docs/proto3#unknowns and
+// https://android-review.googlesource.com/c/platform/external/perfetto/+/
+// 591673#17 for details.
+syntax = "proto3";
+option optimize_for = LITE_RUNTIME;
+
+package perfetto.protos;
+
+// This proto contains trusted fields of TracePacket which may only be generated
+// by the service (as opposed to the untrusted producers). Note that the field
+// ids here must be kept in sync with TracePacket.
+message TrustedPacket {
+  // User id of the producer which generated this packet.
+  oneof optional_trusted_uid { int32 trusted_uid = 3; };
+}
diff --git a/src/traced/perfetto_cmd/perfetto_cmd.cc b/src/traced/perfetto_cmd/perfetto_cmd.cc
index ff2a1ae..9fe6db3 100644
--- a/src/traced/perfetto_cmd/perfetto_cmd.cc
+++ b/src/traced/perfetto_cmd/perfetto_cmd.cc
@@ -39,6 +39,7 @@
 #include "perfetto/tracing/ipc/consumer_ipc_client.h"
 
 #include "perfetto/config/trace_config.pb.h"
+#include "perfetto/trace/trace.pb.h"
 
 #if defined(PERFETTO_BUILD_WITH_ANDROID)
 #include "perfetto/base/android_task_runner.h"
@@ -262,7 +263,8 @@
     for (const Chunk& chunk : packet) {
       uint8_t preamble[16];
       uint8_t* pos = preamble;
-      pos = WriteVarInt(MakeTagLengthDelimited(1 /* field_id */), pos);
+      pos = WriteVarInt(
+          MakeTagLengthDelimited(protos::Trace::kPacketFieldNumber), pos);
       pos = WriteVarInt(static_cast<uint32_t>(chunk.size), pos);
       fwrite(reinterpret_cast<const char*>(preamble), pos - preamble, 1,
              trace_out_stream_.get());
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index 03acfa2..3811987 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -24,6 +24,7 @@
   ]
   deps = [
     "../../gn:default_deps",
+    "../../gn:gtest_prod_config",
     "../../protos/perfetto/config",
     "../base",
   ]
@@ -34,6 +35,8 @@
     "core/data_source_descriptor.cc",
     "core/id_allocator.cc",
     "core/id_allocator.h",
+    "core/packet_stream_validator.cc",
+    "core/packet_stream_validator.h",
     "core/service_impl.cc",
     "core/service_impl.h",
     "core/shared_memory_abi.cc",
@@ -112,6 +115,7 @@
   sources = [
     "core/chunked_protobuf_input_stream_unittest.cc",
     "core/id_allocator_unittest.cc",
+    "core/packet_stream_validator_unittest.cc",
     "core/service_impl_unittest.cc",
     "core/shared_memory_abi_unittest.cc",
     "core/shared_memory_arbiter_impl_unittest.cc",
diff --git a/src/tracing/core/packet_stream_validator.cc b/src/tracing/core/packet_stream_validator.cc
new file mode 100644
index 0000000..b0728fe
--- /dev/null
+++ b/src/tracing/core/packet_stream_validator.cc
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/packet_stream_validator.h"
+
+#include <inttypes.h>
+#include <stddef.h>
+
+#include "perfetto/base/logging.h"
+#include "perfetto/protozero/proto_utils.h"
+#include "perfetto/trace/trace_packet.pb.h"
+#include "perfetto/trace/trusted_packet.pb.h"
+
+namespace perfetto {
+
+// static
+bool PacketStreamValidator::Validate(const ChunkSequence& sequence) {
+  static_assert(protos::TracePacket::kTrustedUidFieldNumber ==
+                    protos::TrustedPacket::kTrustedUidFieldNumber,
+                "trusted uid field id mismatch");
+  ChunkedProtobufInputStream stream(&sequence);
+  size_t size = 0;
+  for (const Chunk& chunk : sequence)
+    size += chunk.size;
+
+  protos::TrustedPacket packet;
+  if (!packet.ParseFromBoundedZeroCopyStream(&stream, size))
+    return false;
+  // Only the service is allowed to fill in the trusted uid.
+  return packet.optional_trusted_uid_case() ==
+         protos::TrustedPacket::OPTIONAL_TRUSTED_UID_NOT_SET;
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/packet_stream_validator.h b/src/tracing/core/packet_stream_validator.h
new file mode 100644
index 0000000..a334daa
--- /dev/null
+++ b/src/tracing/core/packet_stream_validator.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACING_CORE_PACKET_STREAM_VALIDATOR_H_
+#define SRC_TRACING_CORE_PACKET_STREAM_VALIDATOR_H_
+
+#include "src/tracing/core/chunked_protobuf_input_stream.h"
+
+namespace perfetto {
+
+// Checks that the stream of trace packets sent by the producer is well formed.
+// This includes:
+//
+// - Checking that the packets are not truncated.
+// - There are no dangling bytes left over in the packets.
+// - Any trusted fields (e.g., uid) are not set.
+//
+// Note that we only validate top-level fields in the trace proto; sub-messages
+// are simply skipped.
+class PacketStreamValidator {
+ public:
+  PacketStreamValidator() = delete;
+
+  static bool Validate(const ChunkSequence&);
+};
+
+}  // namespace perfetto
+
+#endif  // SRC_TRACING_CORE_PACKET_STREAM_VALIDATOR_H_
diff --git a/src/tracing/core/packet_stream_validator_unittest.cc b/src/tracing/core/packet_stream_validator_unittest.cc
new file mode 100644
index 0000000..1f6f6b8
--- /dev/null
+++ b/src/tracing/core/packet_stream_validator_unittest.cc
@@ -0,0 +1,175 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/tracing/core/packet_stream_validator.h"
+
+#include <string>
+
+#include "gtest/gtest.h"
+
+#include "perfetto/trace/trace_packet.pb.h"
+
+namespace perfetto {
+namespace {
+
+TEST(PacketStreamValidatorTest, NullPacket) {
+  std::string ser_buf;
+  ChunkSequence seq;
+  EXPECT_TRUE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, SimplePacket) {
+  protos::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  std::string ser_buf = proto.SerializeAsString();
+
+  ChunkSequence seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_TRUE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, ComplexPacket) {
+  protos::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  proto.mutable_ftrace_events()->set_cpu(0);
+  auto* ft = proto.mutable_ftrace_events()->add_event();
+  ft->set_pid(42);
+  ft->mutable_sched_switch()->set_prev_comm("tom");
+  ft->mutable_sched_switch()->set_prev_pid(123);
+  ft->mutable_sched_switch()->set_next_comm("jerry");
+  ft->mutable_sched_switch()->set_next_pid(456);
+  std::string ser_buf = proto.SerializeAsString();
+
+  ChunkSequence seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_TRUE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, SimplePacketWithUid) {
+  protos::TracePacket proto;
+  proto.set_trusted_uid(123);
+  std::string ser_buf = proto.SerializeAsString();
+
+  ChunkSequence seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, SimplePacketWithZeroUid) {
+  protos::TracePacket proto;
+  proto.set_trusted_uid(0);
+  std::string ser_buf = proto.SerializeAsString();
+
+  ChunkSequence seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, SimplePacketWithNegativeOneUid) {
+  protos::TracePacket proto;
+  proto.set_trusted_uid(-1);
+  std::string ser_buf = proto.SerializeAsString();
+
+  ChunkSequence seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, ComplexPacketWithUid) {
+  protos::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  proto.mutable_ftrace_events()->set_cpu(0);
+  auto* ft = proto.mutable_ftrace_events()->add_event();
+  ft->set_pid(42);
+  ft->mutable_sched_switch()->set_prev_comm("tom");
+  ft->mutable_sched_switch()->set_prev_pid(123);
+  ft->mutable_sched_switch()->set_next_comm("jerry");
+  ft->mutable_sched_switch()->set_next_pid(456);
+  proto.set_trusted_uid(123);
+  std::string ser_buf = proto.SerializeAsString();
+
+  ChunkSequence seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+TEST(PacketStreamValidatorTest, FragmentedPacket) {
+  protos::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  proto.mutable_ftrace_events()->set_cpu(0);
+  auto* ft = proto.mutable_ftrace_events()->add_event();
+  ft->set_pid(42);
+  ft->mutable_sched_switch()->set_prev_comm("tom");
+  ft->mutable_sched_switch()->set_prev_pid(123);
+  ft->mutable_sched_switch()->set_next_comm("jerry");
+  ft->mutable_sched_switch()->set_next_pid(456);
+  std::string ser_buf = proto.SerializeAsString();
+
+  for (size_t i = 0; i < ser_buf.size(); i++) {
+    ChunkSequence seq;
+    seq.emplace_back(&ser_buf[0], i);
+    seq.emplace_back(&ser_buf[i], ser_buf.size() - i);
+    EXPECT_TRUE(PacketStreamValidator::Validate(seq));
+  }
+}
+
+TEST(PacketStreamValidatorTest, FragmentedPacketWithUid) {
+  protos::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  proto.set_trusted_uid(123);
+  proto.mutable_ftrace_events()->set_cpu(0);
+  auto* ft = proto.mutable_ftrace_events()->add_event();
+  ft->set_pid(42);
+  ft->mutable_sched_switch()->set_prev_comm("tom");
+  ft->mutable_sched_switch()->set_prev_pid(123);
+  ft->mutable_sched_switch()->set_next_comm("jerry");
+  ft->mutable_sched_switch()->set_next_pid(456);
+  proto.mutable_for_testing()->set_str("foo");
+  std::string ser_buf = proto.SerializeAsString();
+
+  for (size_t i = 0; i < ser_buf.size(); i++) {
+    ChunkSequence seq;
+    seq.emplace_back(&ser_buf[0], i);
+    seq.emplace_back(&ser_buf[i], ser_buf.size() - i);
+    EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+  }
+}
+
+TEST(PacketStreamValidatorTest, TruncatedPacket) {
+  protos::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  std::string ser_buf = proto.SerializeAsString();
+
+  for (size_t i = 1; i < ser_buf.size(); i++) {
+    ChunkSequence seq;
+    seq.emplace_back(&ser_buf[0], i);
+    EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+  }
+}
+
+TEST(PacketStreamValidatorTest, TrailingGarbage) {
+  protos::TracePacket proto;
+  proto.mutable_for_testing()->set_str("string field");
+  std::string ser_buf = proto.SerializeAsString();
+  ser_buf += "bike is short for bichael";
+
+  ChunkSequence seq;
+  seq.emplace_back(&ser_buf[0], ser_buf.size());
+  EXPECT_FALSE(PacketStreamValidator::Validate(seq));
+}
+
+}  // namespace
+}  // namespace perfetto
diff --git a/src/tracing/core/service_impl.cc b/src/tracing/core/service_impl.cc
index 3ec7644..e80dbaa 100644
--- a/src/tracing/core/service_impl.cc
+++ b/src/tracing/core/service_impl.cc
@@ -30,6 +30,10 @@
 #include "perfetto/tracing/core/producer.h"
 #include "perfetto/tracing/core/shared_memory.h"
 #include "perfetto/tracing/core/trace_packet.h"
+#include "src/tracing/core/packet_stream_validator.h"
+
+#include "perfetto/trace/trace_packet.pb.h"
+#include "perfetto/trace/trusted_packet.pb.h"
 
 // General note: this class must assume that Producers are malicious and will
 // try to crash / exploit this class. We can trust pointers because they come
@@ -38,7 +42,9 @@
 
 namespace perfetto {
 
+using protozero::proto_utils::MakeTagVarInt;
 using protozero::proto_utils::ParseVarInt;
+using protozero::proto_utils::WriteVarInt;
 
 namespace {
 constexpr size_t kDefaultShmSize = base::kPageSize * 64;  // 256 KB.
@@ -69,6 +75,7 @@
 
 std::unique_ptr<Service::ProducerEndpoint> ServiceImpl::ConnectProducer(
     Producer* producer,
+    uid_t uid,
     size_t shared_buffer_size_hint_bytes) {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   const ProducerID id = ++last_producer_id_;
@@ -82,7 +89,7 @@
   // to go away.
   auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
-      id, this, task_runner_, producer, std::move(shared_memory)));
+      id, uid, this, task_runner_, producer, std::move(shared_memory)));
   auto it_and_inserted = producers_.emplace(id, endpoint.get());
   PERFETTO_DCHECK(it_and_inserted.second);
   task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
@@ -297,6 +304,7 @@
       const size_t page_idx = (i + tbuf.cur_page) % tbuf.num_pages();
       if (abi.is_page_free(page_idx))
         continue;
+      const uid_t page_owner = tbuf.get_page_owner(page_idx);
       uint32_t layout = abi.page_layout_dbg(page_idx);
       size_t num_chunks = abi.GetNumChunksForLayout(layout);
       for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
@@ -333,9 +341,33 @@
             PERFETTO_DLOG("out of bounds!");
             break;
           }
+          ChunkSequence chunk_seq;
+          chunk_seq.emplace_back(ptr, pack_size);
+          if (!skip && !PacketStreamValidator::Validate(chunk_seq)) {
+            PERFETTO_DLOG("Dropping invalid packet");
+            skip = true;
+          }
+
           if (!skip) {
             packets->emplace_back();
-            packets->back().AddChunk(Chunk(ptr, pack_size));
+            for (Chunk& validated_chunk : chunk_seq)
+              packets->back().AddChunk(std::move(validated_chunk));
+
+            // Append a chunk with the trusted UID of the producer. This can't
+            // be spoofed because above we validated that the existing chunks
+            // don't contain any trusted UID fields. For added safety we append
+            // instead of prepending because according to protobuf semantics, if
+            // the same field is encountered multiple times the last instance
+            // takes priority. Note that truncated packets are also rejected, so
+            // the producer can't give us a partial packet (e.g., a truncated
+            // string) which only becomes valid when the UID is appended here.
+            protos::TrustedPacket trusted_packet;
+            trusted_packet.set_trusted_uid(page_owner);
+            uint8_t trusted_buf[16];
+            PERFETTO_CHECK(trusted_packet.SerializeToArray(
+                &trusted_buf, sizeof(trusted_buf)));
+            packets->back().AddChunk(
+                Chunk::Copy(trusted_buf, trusted_packet.ByteSize()));
           }
           ptr += pack_size;
         }  // for(packet)
@@ -466,7 +498,8 @@
   // log buffer that has nothing to do with it.
 
   PERFETTO_DCHECK(size == kBufferPageSize);
-  uint8_t* dst = buf.get_next_page();
+  uid_t uid = GetProducer(producer_id)->uid_;
+  uint8_t* dst = buf.acquire_next_page(uid);
 
   // TODO(primiano): use sendfile(). Requires to make the tbuf itself
   // a file descriptor (just use SharedMemory without sharing it).
@@ -543,11 +576,13 @@
 
 ServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
     ProducerID id,
+    uid_t uid,
     ServiceImpl* service,
     base::TaskRunner* task_runner,
     Producer* producer,
     std::unique_ptr<SharedMemory> shared_memory)
     : id_(id),
+      uid_(uid),
       service_(service),
       task_runner_(task_runner),
       producer_(producer),
@@ -639,6 +674,8 @@
   }
   size = sz;
   abi.reset(new SharedMemoryABI(get_page(0), size, kBufferPageSize));
+  PERFETTO_DCHECK(page_owners.empty());
+  page_owners.resize(size, -1);
   return true;
 }
 
diff --git a/src/tracing/core/service_impl.h b/src/tracing/core/service_impl.h
index 0883264..cb1ab16 100644
--- a/src/tracing/core/service_impl.h
+++ b/src/tracing/core/service_impl.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <set>
 
+#include "gtest/gtest_prod.h"
 #include "perfetto/base/page_allocator.h"
 #include "perfetto/base/weak_ptr.h"
 #include "perfetto/tracing/core/basic_types.h"
@@ -52,6 +53,7 @@
   class ProducerEndpointImpl : public Service::ProducerEndpoint {
    public:
     ProducerEndpointImpl(ProducerID,
+                         uid_t uid,
                          ServiceImpl*,
                          base::TaskRunner*,
                          Producer*,
@@ -69,10 +71,12 @@
 
    private:
     friend class ServiceImpl;
+    FRIEND_TEST(ServiceImplTest, RegisterAndUnregister);
     ProducerEndpointImpl(const ProducerEndpointImpl&) = delete;
     ProducerEndpointImpl& operator=(const ProducerEndpointImpl&) = delete;
 
     ProducerID const id_;
+    const uid_t uid_;
     ServiceImpl* const service_;
     base::TaskRunner* const task_runner_;
     Producer* producer_;
@@ -134,6 +138,7 @@
   // Service implementation.
   std::unique_ptr<Service::ProducerEndpoint> ConnectProducer(
       Producer*,
+      uid_t uid,
       size_t shared_buffer_size_hint_bytes = 0) override;
 
   std::unique_ptr<Service::ConsumerEndpoint> ConnectConsumer(
@@ -164,9 +169,15 @@
       return reinterpret_cast<uint8_t*>(data.get()) + page * kBufferPageSize;
     }
 
-    uint8_t* get_next_page() {
+    uid_t get_page_owner(size_t page) const {
+      PERFETTO_DCHECK(page < num_pages());
+      return page_owners[page];
+    }
+
+    uint8_t* acquire_next_page(uid_t uid) {
       size_t cur = cur_page;
       cur_page = cur_page == num_pages() - 1 ? 0 : cur_page + 1;
+      page_owners[cur] = uid;
       return get_page(cur);
     }
 
@@ -179,6 +190,9 @@
     // the convenience of SharedMemoryABI for bookkeeping of the buffer when
     // implementing ReadBuffers().
     std::unique_ptr<SharedMemoryABI> abi;
+
+    // Trusted uid for each acquired page.
+    std::vector<uid_t> page_owners;
   };
 
   // Holds the state of a tracing session. A tracing session is uniquely bound
diff --git a/src/tracing/core/service_impl_unittest.cc b/src/tracing/core/service_impl_unittest.cc
index 44ee135..8f13777 100644
--- a/src/tracing/core/service_impl_unittest.cc
+++ b/src/tracing/core/service_impl_unittest.cc
@@ -28,12 +28,12 @@
 #include "src/tracing/test/test_shared_memory.h"
 
 namespace perfetto {
-namespace {
-
 using ::testing::_;
 using ::testing::InSequence;
 using ::testing::Mock;
 
+namespace {
+
 class MockProducer : public Producer {
  public:
   ~MockProducer() override {}
@@ -46,7 +46,9 @@
   MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
 };
 
-TEST(ServiceImpl, RegisterAndUnregister) {
+}  // namespace
+
+TEST(ServiceImplTest, RegisterAndUnregister) {
   base::TestTaskRunner task_runner;
   auto shm_factory =
       std::unique_ptr<SharedMemory::Factory>(new TestSharedMemory::Factory());
@@ -55,9 +57,9 @@
   MockProducer mock_producer_1;
   MockProducer mock_producer_2;
   std::unique_ptr<Service::ProducerEndpoint> producer_endpoint_1 =
-      svc->ConnectProducer(&mock_producer_1);
+      svc->ConnectProducer(&mock_producer_1, 123u /* uid */);
   std::unique_ptr<Service::ProducerEndpoint> producer_endpoint_2 =
-      svc->ConnectProducer(&mock_producer_2);
+      svc->ConnectProducer(&mock_producer_2, 456u /* uid */);
 
   ASSERT_TRUE(producer_endpoint_1);
   ASSERT_TRUE(producer_endpoint_2);
@@ -70,6 +72,8 @@
   ASSERT_EQ(2u, svc->num_producers());
   ASSERT_EQ(producer_endpoint_1.get(), svc->GetProducer(1));
   ASSERT_EQ(producer_endpoint_2.get(), svc->GetProducer(2));
+  ASSERT_EQ(123u, svc->GetProducer(1)->uid_);
+  ASSERT_EQ(456u, svc->GetProducer(2)->uid_);
 
   DataSourceDescriptor ds_desc1;
   ds_desc1.set_name("foo");
@@ -109,5 +113,4 @@
   ASSERT_EQ(0u, svc->num_producers());
 }
 
-}  // namespace
 }  // namespace perfetto
diff --git a/src/tracing/core/trace_packet.cc b/src/tracing/core/trace_packet.cc
index 648beaf..3f711c7 100644
--- a/src/tracing/core/trace_packet.cc
+++ b/src/tracing/core/trace_packet.cc
@@ -41,8 +41,8 @@
 }
 
 void TracePacket::AddChunk(Chunk chunk) {
-  chunks_.push_back(chunk);
   size_ += chunk.size;
+  chunks_.push_back(std::move(chunk));
 }
 
 }  // namespace perfetto
diff --git a/src/tracing/core/trace_packet_unittest.cc b/src/tracing/core/trace_packet_unittest.cc
index 5bed01b..914249d 100644
--- a/src/tracing/core/trace_packet_unittest.cc
+++ b/src/tracing/core/trace_packet_unittest.cc
@@ -27,7 +27,7 @@
 
 TEST(TracePacketTest, Simple) {
   protos::TracePacket proto;
-  proto.set_test("string field");
+  proto.mutable_for_testing()->set_str("string field");
   std::string ser_buf = proto.SerializeAsString();
   TracePacket tp;
   tp.AddChunk({ser_buf.data(), ser_buf.size()});
@@ -40,23 +40,24 @@
   ASSERT_TRUE(tp.Decode());
   ASSERT_TRUE(tp.Decode());  // Decode() should be idempotent.
   ASSERT_NE(nullptr, tp.operator->());
-  ASSERT_EQ(proto.test(), tp->test());
-  ASSERT_EQ(proto.test(), (*tp).test());
+  ASSERT_EQ(proto.for_testing().str(), tp->for_testing().str());
+  ASSERT_EQ(proto.for_testing().str(), (*tp).for_testing().str());
 
   // Check move operators.
   TracePacket moved_tp(std::move(tp));
   ASSERT_NE(nullptr, moved_tp.operator->());
-  ASSERT_EQ(proto.test(), moved_tp->test());
+  ASSERT_EQ(proto.for_testing().str(), moved_tp->for_testing().str());
 
   TracePacket moved_tp_2;
   moved_tp_2 = std::move(moved_tp);
   ASSERT_NE(nullptr, moved_tp_2.operator->());
-  ASSERT_EQ(proto.test(), moved_tp_2->test());
+  ASSERT_EQ(proto.for_testing().str(), moved_tp_2->for_testing().str());
 }
 
 TEST(TracePacketTest, Chunked) {
   protos::TracePacket proto;
-  proto.set_test("this is an arbitrarily long string ........................");
+  proto.mutable_for_testing()->set_str(
+      "this is an arbitrarily long string ........................");
   std::string ser_buf = proto.SerializeAsString();
   TracePacket tp;
   tp.AddChunk({ser_buf.data(), 3});
@@ -81,12 +82,12 @@
 
   ASSERT_TRUE(tp.Decode());
   ASSERT_NE(nullptr, tp.operator->());
-  ASSERT_EQ(proto.test(), tp->test());
+  ASSERT_EQ(proto.for_testing().str(), tp->for_testing().str());
 }
 
 TEST(TracePacketTest, Corrupted) {
   protos::TracePacket proto;
-  proto.set_test("string field");
+  proto.mutable_for_testing()->set_str("string field");
   std::string ser_buf = proto.SerializeAsString();
   TracePacket tp;
   tp.AddChunk({ser_buf.data(), ser_buf.size() - 2});  // corrupted.
diff --git a/src/tracing/core/trace_writer_impl_unittest.cc b/src/tracing/core/trace_writer_impl_unittest.cc
index a1a042f..3fe5b47 100644
--- a/src/tracing/core/trace_writer_impl_unittest.cc
+++ b/src/tracing/core/trace_writer_impl_unittest.cc
@@ -23,6 +23,7 @@
 #include "src/tracing/core/shared_memory_arbiter_impl.h"
 #include "src/tracing/test/aligned_buffer_test.h"
 
+#include "perfetto/trace/test_event.pbzero.h"
 #include "perfetto/trace/trace_packet.pbzero.h"
 
 namespace perfetto {
@@ -63,7 +64,7 @@
     auto packet = writer->NewTracePacket();
     char str[16];
     sprintf(str, "foobar %zu", i);
-    packet->set_test(str);
+    packet->set_for_testing()->set_str(str);
   }
 
   // Destroying the TraceWriteImpl should cause the last packet to be finalized
diff --git a/src/tracing/ipc/service/producer_ipc_service.cc b/src/tracing/ipc/service/producer_ipc_service.cc
index 5b7e679..efccfb5 100644
--- a/src/tracing/ipc/service/producer_ipc_service.cc
+++ b/src/tracing/ipc/service/producer_ipc_service.cc
@@ -51,7 +51,8 @@
 void ProducerIPCService::InitializeConnection(
     const InitializeConnectionRequest& req,
     DeferredInitializeConnectionResponse response) {
-  const ipc::ClientID ipc_client_id = ipc::Service::client_info().client_id();
+  const auto& client_info = ipc::Service::client_info();
+  const ipc::ClientID ipc_client_id = client_info.client_id();
   PERFETTO_CHECK(ipc_client_id);
 
   if (producers_.count(ipc_client_id) > 0) {
@@ -65,7 +66,7 @@
 
   // ConnectProducer will call OnConnect() on the next task.
   producer->service_endpoint = core_service_->ConnectProducer(
-      producer.get(), req.shared_buffer_size_hint_bytes());
+      producer.get(), client_info.uid(), req.shared_buffer_size_hint_bytes());
   const int shm_fd = static_cast<PosixSharedMemory*>(
                          producer->service_endpoint->shared_memory())
                          ->fd();
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index 046e9fa..9709bf7 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -71,6 +71,7 @@
   MOCK_METHOD2(CreateDataSourceInstance,
                void(DataSourceInstanceID, const DataSourceConfig&));
   MOCK_METHOD1(TearDownDataSourceInstance, void(DataSourceInstanceID));
+  MOCK_METHOD0(uid, uid_t());
 };
 
 class MockConsumer : public Consumer {
@@ -165,7 +166,7 @@
   for (size_t i = 0; i < kNumPackets; i++) {
     char buf[8];
     sprintf(buf, "evt_%zu", i);
-    writer->NewTracePacket()->set_test_event()->set_str(buf, strlen(buf));
+    writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
   }
 
   // Allow the service to see the NotifySharedMemoryUpdate() before disabling
diff --git a/test/end_to_end_integrationtest.cc b/test/end_to_end_integrationtest.cc
index deb1a59..8ce6c4a 100644
--- a/test/end_to_end_integrationtest.cc
+++ b/test/end_to_end_integrationtest.cc
@@ -100,16 +100,19 @@
 
   class FakeProducerDelegate : public ThreadDelegate {
    public:
-    FakeProducerDelegate() = default;
+    FakeProducerDelegate(std::function<void()> connect_callback)
+        : connect_callback_(std::move(connect_callback)) {}
     ~FakeProducerDelegate() override = default;
 
     void Initialize(base::TaskRunner* task_runner) override {
       producer_.reset(new FakeProducer("android.perfetto.FakeProducer"));
-      producer_->Connect(TEST_PRODUCER_SOCK_NAME, task_runner);
+      producer_->Connect(TEST_PRODUCER_SOCK_NAME, task_runner,
+                         std::move(connect_callback_));
     }
 
    private:
     std::unique_ptr<FakeProducer> producer_;
+    std::function<void()> connect_callback_;
   };
 };
 
@@ -180,7 +183,7 @@
   base::TestTaskRunner task_runner;
   auto finish = task_runner.CreateCheckpoint("no.more.packets");
 
-  // Setip the TraceConfig for the consumer.
+  // Setup the TraceConfig for the consumer.
   TraceConfig trace_config;
   trace_config.add_buffers()->set_size_kb(4096 * 10);
   trace_config.set_duration_ms(200);
@@ -197,8 +200,10 @@
     if (has_more) {
       for (auto& packet : packets) {
         packet.Decode();
-        ASSERT_TRUE(packet->has_test());
-        ASSERT_EQ(packet->test(), "test");
+        ASSERT_TRUE(packet->has_for_testing());
+        ASSERT_EQ(protos::TracePacket::kTrustedUid,
+                  packet->optional_trusted_uid_case());
+        ASSERT_EQ(packet->for_testing().str(), "test");
       }
       total += packets.size();
 
@@ -216,14 +221,20 @@
   service_thread.Start(std::unique_ptr<ServiceDelegate>(new ServiceDelegate));
 #endif
 
+  auto data_produced = task_runner.CreateCheckpoint("data.produced");
   TaskRunnerThread producer_thread;
-  producer_thread.Start(
-      std::unique_ptr<FakeProducerDelegate>(new FakeProducerDelegate));
+  producer_thread.Start(std::unique_ptr<FakeProducerDelegate>(
+      new FakeProducerDelegate([&task_runner, &data_produced] {
+        task_runner.PostTask(data_produced);
+      })));
 
   // Finally, make the consumer connect to the service.
   FakeConsumer consumer(trace_config, std::move(function), &task_runner);
   consumer.Connect(TEST_CONSUMER_SOCK_NAME);
 
+  task_runner.RunUntilCheckpoint("data.produced");
+  consumer.ReadTraceData();
+
   task_runner.RunUntilCheckpoint("no.more.packets");
 }
 
diff --git a/test/fake_consumer.cc b/test/fake_consumer.cc
index 3347502..54bf7db 100644
--- a/test/fake_consumer.cc
+++ b/test/fake_consumer.cc
@@ -44,11 +44,11 @@
 
 void FakeConsumer::OnConnect() {
   endpoint_->EnableTracing(trace_config_);
-  task_runner_->PostDelayedTask(std::bind([this]() {
-                                  endpoint_->DisableTracing();
-                                  endpoint_->ReadBuffers();
-                                }),
-                                trace_config_.duration_ms());
+}
+
+void FakeConsumer::ReadTraceData() {
+  endpoint_->DisableTracing();
+  endpoint_->ReadBuffers();
 }
 
 void FakeConsumer::OnDisconnect() {
diff --git a/test/fake_consumer.h b/test/fake_consumer.h
index cad7ee0..f50b9da 100644
--- a/test/fake_consumer.h
+++ b/test/fake_consumer.h
@@ -38,6 +38,7 @@
   ~FakeConsumer() override;
 
   void Connect(const char* socket_name);
+  void ReadTraceData();
 
   // Consumer implementation.
   void OnConnect() override;
diff --git a/test/fake_producer.cc b/test/fake_producer.cc
index 2554256..ba66b66 100644
--- a/test/fake_producer.cc
+++ b/test/fake_producer.cc
@@ -30,7 +30,10 @@
 FakeProducer::~FakeProducer() = default;
 
 void FakeProducer::Connect(const char* socket_name,
-                           base::TaskRunner* task_runner) {
+                           base::TaskRunner* task_runner,
+                           std::function<void()> data_produced_callback) {
+  task_runner_ = task_runner;
+  data_produced_callback_ = std::move(data_produced_callback);
   endpoint_ = ProducerIPCClient::Connect(socket_name, this, task_runner);
 }
 
@@ -50,13 +53,18 @@
       static_cast<BufferID>(source_config.target_buffer()));
   for (int i = 0; i < 10; i++) {
     auto handle = trace_writer->NewTracePacket();
-    handle->set_test("test");
+    handle->set_for_testing()->set_str("test");
     handle->Finalize();
   }
 
   // TODO(primiano): reenable this once UnregisterDataSource is specified in
   // ServiceImpl.
   // endpoint_->UnregisterDataSource(id_);
+
+  // TODO(skyostil): There's a race here before the service processes our data
+  // and the consumer tries to retrieve it. For now wait a bit until the service
+  // is done, but we should add explicit flushing to avoid this.
+  task_runner_->PostDelayedTask(data_produced_callback_, 1000);
 }
 
 void FakeProducer::TearDownDataSourceInstance(DataSourceInstanceID) {}
diff --git a/test/fake_producer.h b/test/fake_producer.h
index 5b61468..68a89e1 100644
--- a/test/fake_producer.h
+++ b/test/fake_producer.h
@@ -33,7 +33,9 @@
   explicit FakeProducer(const std::string& name);
   ~FakeProducer() override;
 
-  void Connect(const char* socket_name, base::TaskRunner* task_runner);
+  void Connect(const char* socket_name,
+               base::TaskRunner* task_runner,
+               std::function<void()> data_produced_callback);
 
   // Producer implementation.
   void OnConnect() override;
@@ -48,6 +50,8 @@
   std::string name_;
   DataSourceID id_ = 0;
   std::unique_ptr<Service::ProducerEndpoint> endpoint_;
+  base::TaskRunner* task_runner_ = nullptr;
+  std::function<void()> data_produced_callback_;
 };
 
 }  // namespace perfetto