service: Add support for ObservableEvents.

Adds a Consumer-facing interface for observable events.
Implements a first such event for data source instance state changes.

Bug: 127948038
Change-Id: Ia05cfef7289eb0237b14f0fb7823f63aec7ee95d
diff --git a/Android.bp b/Android.bp
index aa87928..e28d21c 100644
--- a/Android.bp
+++ b/Android.bp
@@ -86,6 +86,7 @@
     "src/tracing/core/id_allocator.cc",
     "src/tracing/core/inode_file_config.cc",
     "src/tracing/core/null_trace_writer.cc",
+    "src/tracing/core/observable_events.cc",
     "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/process_stats_config.cc",
     "src/tracing/core/shared_memory_abi.cc",
@@ -290,6 +291,7 @@
     "src/tracing/core/id_allocator.cc",
     "src/tracing/core/inode_file_config.cc",
     "src/tracing/core/null_trace_writer.cc",
+    "src/tracing/core/observable_events.cc",
     "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/process_stats_config.cc",
     "src/tracing/core/shared_memory_abi.cc",
@@ -444,6 +446,7 @@
     "src/tracing/core/id_allocator.cc",
     "src/tracing/core/inode_file_config.cc",
     "src/tracing/core/null_trace_writer.cc",
+    "src/tracing/core/observable_events.cc",
     "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/process_stats_config.cc",
     "src/tracing/core/shared_memory_abi.cc",
@@ -649,6 +652,7 @@
     "src/tracing/core/id_allocator.cc",
     "src/tracing/core/inode_file_config.cc",
     "src/tracing/core/null_trace_writer.cc",
+    "src/tracing/core/observable_events.cc",
     "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/process_stats_config.cc",
     "src/tracing/core/shared_memory_abi.cc",
@@ -736,6 +740,7 @@
   srcs: [
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
+    "protos/perfetto/common/observable_events.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
     "protos/perfetto/common/trace_stats.proto",
   ],
@@ -746,6 +751,7 @@
   out: [
     "external/perfetto/protos/perfetto/common/android_log_constants.pb.cc",
     "external/perfetto/protos/perfetto/common/commit_data_request.pb.cc",
+    "external/perfetto/protos/perfetto/common/observable_events.pb.cc",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pb.cc",
     "external/perfetto/protos/perfetto/common/trace_stats.pb.cc",
   ],
@@ -757,6 +763,7 @@
   srcs: [
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
+    "protos/perfetto/common/observable_events.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
     "protos/perfetto/common/trace_stats.proto",
   ],
@@ -767,6 +774,7 @@
   out: [
     "external/perfetto/protos/perfetto/common/android_log_constants.pb.h",
     "external/perfetto/protos/perfetto/common/commit_data_request.pb.h",
+    "external/perfetto/protos/perfetto/common/observable_events.pb.h",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pb.h",
     "external/perfetto/protos/perfetto/common/trace_stats.pb.h",
   ],
@@ -781,6 +789,7 @@
   srcs: [
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
+    "protos/perfetto/common/observable_events.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
     "protos/perfetto/common/trace_stats.proto",
   ],
@@ -792,6 +801,7 @@
   out: [
     "external/perfetto/protos/perfetto/common/android_log_constants.pbzero.cc",
     "external/perfetto/protos/perfetto/common/commit_data_request.pbzero.cc",
+    "external/perfetto/protos/perfetto/common/observable_events.pbzero.cc",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pbzero.cc",
     "external/perfetto/protos/perfetto/common/trace_stats.pbzero.cc",
   ],
@@ -803,6 +813,7 @@
   srcs: [
     "protos/perfetto/common/android_log_constants.proto",
     "protos/perfetto/common/commit_data_request.proto",
+    "protos/perfetto/common/observable_events.proto",
     "protos/perfetto/common/sys_stats_counters.proto",
     "protos/perfetto/common/trace_stats.proto",
   ],
@@ -814,6 +825,7 @@
   out: [
     "external/perfetto/protos/perfetto/common/android_log_constants.pbzero.h",
     "external/perfetto/protos/perfetto/common/commit_data_request.pbzero.h",
+    "external/perfetto/protos/perfetto/common/observable_events.pbzero.h",
     "external/perfetto/protos/perfetto/common/sys_stats_counters.pbzero.h",
     "external/perfetto/protos/perfetto/common/trace_stats.pbzero.h",
   ],
@@ -2615,6 +2627,7 @@
     "src/tracing/core/id_allocator.cc",
     "src/tracing/core/inode_file_config.cc",
     "src/tracing/core/null_trace_writer.cc",
+    "src/tracing/core/observable_events.cc",
     "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/process_stats_config.cc",
     "src/tracing/core/shared_memory_abi.cc",
@@ -2957,6 +2970,7 @@
     "src/tracing/core/inode_file_config.cc",
     "src/tracing/core/null_trace_writer.cc",
     "src/tracing/core/null_trace_writer_unittest.cc",
+    "src/tracing/core/observable_events.cc",
     "src/tracing/core/packet_stream_validator.cc",
     "src/tracing/core/packet_stream_validator_unittest.cc",
     "src/tracing/core/patch_list_unittest.cc",
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index d9e375c..1812133 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -22,6 +22,7 @@
     "consumer.h",
     "data_source_config.h",
     "data_source_descriptor.h",
+    "observable_events.h",
     "producer.h",
     "shared_memory.h",
     "shared_memory_abi.h",
diff --git a/include/perfetto/tracing/core/consumer.h b/include/perfetto/tracing/core/consumer.h
index 6e5e779..cf92253 100644
--- a/include/perfetto/tracing/core/consumer.h
+++ b/include/perfetto/tracing/core/consumer.h
@@ -17,11 +17,11 @@
 #ifndef INCLUDE_PERFETTO_TRACING_CORE_CONSUMER_H_
 #define INCLUDE_PERFETTO_TRACING_CORE_CONSUMER_H_
 
-#include "perfetto/tracing/core/basic_types.h"
-
 #include <vector>
 
 #include "perfetto/base/export.h"
+#include "perfetto/tracing/core/basic_types.h"
+#include "perfetto/tracing/core/observable_events.h"
 
 namespace perfetto {
 
@@ -73,6 +73,11 @@
   // Called back by the Service (or transport layer) after invoking
   // TracingService::ConsumerEndpoint::GetTraceStats().
   virtual void OnTraceStats(bool success, const TraceStats&) = 0;
+
+  // Called back by the Service (or transport layer) after invoking
+  // TracingService::ConsumerEndpoint::ObserveEvents() whenever one or more
+  // ObservableEvents of enabled event types occur.
+  virtual void OnObservableEvents(const ObservableEvents&) = 0;
 };
 
 }  // namespace perfetto
diff --git a/include/perfetto/tracing/core/observable_events.h b/include/perfetto/tracing/core/observable_events.h
new file mode 100644
index 0000000..0429411
--- /dev/null
+++ b/include/perfetto/tracing/core/observable_events.h
@@ -0,0 +1,137 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*******************************************************************************
+ * AUTOGENERATED - DO NOT EDIT
+ *******************************************************************************
+ * This file has been generated from the protobuf message
+ * perfetto/common/observable_events.proto
+ * by
+ * ../../tools/proto_to_cpp/proto_to_cpp.cc.
+ * If you need to make changes here, change the .proto file and then run
+ * ./tools/gen_tracing_cpp_headers_from_protos
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_OBSERVABLE_EVENTS_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_OBSERVABLE_EVENTS_H_
+
+#include <stdint.h>
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "perfetto/base/export.h"
+
+// Forward declarations for protobuf types.
+namespace perfetto {
+namespace protos {
+class ObservableEvents;
+class ObservableEvents_DataSourceInstanceStateChange;
+}  // namespace protos
+}  // namespace perfetto
+
+namespace perfetto {
+
+class PERFETTO_EXPORT ObservableEvents {
+ public:
+  class PERFETTO_EXPORT DataSourceInstanceStateChange {
+   public:
+    enum DataSourceInstanceState {
+      DATA_SOURCE_INSTANCE_STATE_STOPPED = 1,
+      DATA_SOURCE_INSTANCE_STATE_STARTED = 2,
+    };
+    DataSourceInstanceStateChange();
+    ~DataSourceInstanceStateChange();
+    DataSourceInstanceStateChange(DataSourceInstanceStateChange&&) noexcept;
+    DataSourceInstanceStateChange& operator=(DataSourceInstanceStateChange&&);
+    DataSourceInstanceStateChange(const DataSourceInstanceStateChange&);
+    DataSourceInstanceStateChange& operator=(
+        const DataSourceInstanceStateChange&);
+    bool operator==(const DataSourceInstanceStateChange&) const;
+    bool operator!=(const DataSourceInstanceStateChange& other) const {
+      return !(*this == other);
+    }
+
+    // Conversion methods from/to the corresponding protobuf types.
+    void FromProto(const perfetto::protos::
+                       ObservableEvents_DataSourceInstanceStateChange&);
+    void ToProto(
+        perfetto::protos::ObservableEvents_DataSourceInstanceStateChange*)
+        const;
+
+    const std::string& producer_name() const { return producer_name_; }
+    void set_producer_name(const std::string& value) { producer_name_ = value; }
+
+    const std::string& data_source_name() const { return data_source_name_; }
+    void set_data_source_name(const std::string& value) {
+      data_source_name_ = value;
+    }
+
+    DataSourceInstanceState state() const { return state_; }
+    void set_state(DataSourceInstanceState value) { state_ = value; }
+
+   private:
+    std::string producer_name_ = {};
+    std::string data_source_name_ = {};
+    DataSourceInstanceState state_ = {};
+
+    // Allows to preserve unknown protobuf fields for compatibility
+    // with future versions of .proto files.
+    std::string unknown_fields_;
+  };
+
+  ObservableEvents();
+  ~ObservableEvents();
+  ObservableEvents(ObservableEvents&&) noexcept;
+  ObservableEvents& operator=(ObservableEvents&&);
+  ObservableEvents(const ObservableEvents&);
+  ObservableEvents& operator=(const ObservableEvents&);
+  bool operator==(const ObservableEvents&) const;
+  bool operator!=(const ObservableEvents& other) const {
+    return !(*this == other);
+  }
+
+  // Conversion methods from/to the corresponding protobuf types.
+  void FromProto(const perfetto::protos::ObservableEvents&);
+  void ToProto(perfetto::protos::ObservableEvents*) const;
+
+  int instance_state_changes_size() const {
+    return static_cast<int>(instance_state_changes_.size());
+  }
+  const std::vector<DataSourceInstanceStateChange>& instance_state_changes()
+      const {
+    return instance_state_changes_;
+  }
+  std::vector<DataSourceInstanceStateChange>* mutable_instance_state_changes() {
+    return &instance_state_changes_;
+  }
+  void clear_instance_state_changes() { instance_state_changes_.clear(); }
+  DataSourceInstanceStateChange* add_instance_state_changes() {
+    instance_state_changes_.emplace_back();
+    return &instance_state_changes_.back();
+  }
+
+ private:
+  std::vector<DataSourceInstanceStateChange> instance_state_changes_;
+
+  // Allows to preserve unknown protobuf fields for compatibility
+  // with future versions of .proto files.
+  std::string unknown_fields_;
+};
+
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_TRACING_CORE_OBSERVABLE_EVENTS_H_
diff --git a/include/perfetto/tracing/core/tracing_service.h b/include/perfetto/tracing/core/tracing_service.h
index 24e6e86..59cc47a 100644
--- a/include/perfetto/tracing/core/tracing_service.h
+++ b/include/perfetto/tracing/core/tracing_service.h
@@ -188,6 +188,20 @@
 
     // Will call OnTraceStats().
     virtual void GetTraceStats() = 0;
+
+    enum ObservableEventType : uint32_t {
+      kNone = 0,
+      kDataSourceInstances = 1 << 0
+    };
+
+    // Start or stop observing events of selected types. |enabled_event_types|
+    // specifies the types of events to observe in a bitmask (see
+    // ObservableEventType enum). To disable observing, pass
+    // ObservableEventType::kNone. Will call OnObservableEvents() repeatedly
+    // whenever an event of an enabled ObservableEventType occurs.
+    //
+    // TODO(eseckler): Extend this to support producers & data sources.
+    virtual void ObserveEvents(uint32_t enabled_event_types) = 0;
   };  // class ConsumerEndpoint.
 
   // Implemented in src/core/tracing_service_impl.cc .
diff --git a/protos/perfetto/common/BUILD.gn b/protos/perfetto/common/BUILD.gn
index fb34919..a67b42d 100644
--- a/protos/perfetto/common/BUILD.gn
+++ b/protos/perfetto/common/BUILD.gn
@@ -19,6 +19,7 @@
 common_sources = [
   "commit_data_request.proto",
   "android_log_constants.proto",
+  "observable_events.proto",
   "sys_stats_counters.proto",
   "trace_stats.proto",
 ]
diff --git a/protos/perfetto/common/observable_events.proto b/protos/perfetto/common/observable_events.proto
new file mode 100644
index 0000000..e1256f8
--- /dev/null
+++ b/protos/perfetto/common/observable_events.proto
@@ -0,0 +1,44 @@
+/*
+ * Copyright (C) 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+option optimize_for = LITE_RUNTIME;
+
+package perfetto.protos;
+
+message ObservableEvents {
+  enum Type {
+    TYPE_UNSPECIFIED = 0;
+    // State changes of data source instances associated with the consumer's
+    // session. Note that not all data sources may support these notifications.
+    TYPE_DATA_SOURCES_INSTANCES = 1;
+    // TODO(eseckler): Extend this for producer & data source registrations.
+  }
+
+  enum DataSourceInstanceState {
+    // A data source is created in stopped state.
+    DATA_SOURCE_INSTANCE_STATE_STOPPED = 1;
+    DATA_SOURCE_INSTANCE_STATE_STARTED = 2;
+  }
+
+  message DataSourceInstanceStateChange {
+    optional string producer_name = 1;
+    optional string data_source_name = 2;
+    optional DataSourceInstanceState state = 3;
+  }
+
+  repeated DataSourceInstanceStateChange instance_state_changes = 1;
+}
diff --git a/protos/perfetto/ipc/consumer_port.proto b/protos/perfetto/ipc/consumer_port.proto
index e2d3dd1..5c293ab 100644
--- a/protos/perfetto/ipc/consumer_port.proto
+++ b/protos/perfetto/ipc/consumer_port.proto
@@ -17,6 +17,7 @@
 syntax = "proto2";
 option optimize_for = LITE_RUNTIME;
 
+import "perfetto/common/observable_events.proto";
 import "perfetto/common/trace_stats.proto";
 import "perfetto/config/trace_config.proto";
 
@@ -93,6 +94,11 @@
   // such as buffer usage stats. Intended for debugging or UI use.
   rpc GetTraceStats(GetTraceStatsRequest) returns (GetTraceStatsResponse) {}
 
+  // Allows the consumer to observe certain state changes, such as data source
+  // instances starting to record.
+  rpc ObserveEvents(ObserveEventsRequest)
+      returns (stream ObserveEventsResponse) {}
+
   // TODO rpc ListDataSources(), for the UI.
 }
 
@@ -195,3 +201,15 @@
 message GetTraceStatsResponse {
   optional TraceStats trace_stats = 1;
 }
+
+// Arguments for rpc ObserveEvents.
+
+// To stop observing events of a certain type, send a request with the remaining
+// types. To stop observing completely, send an empty request.
+message ObserveEventsRequest {
+  repeated ObservableEvents.Type events_to_observe = 1;
+}
+
+message ObserveEventsResponse {
+  optional ObservableEvents events = 1;
+}
diff --git a/src/perfetto_cmd/perfetto_cmd.cc b/src/perfetto_cmd/perfetto_cmd.cc
index bf890b7..74d8c59 100644
--- a/src/perfetto_cmd/perfetto_cmd.cc
+++ b/src/perfetto_cmd/perfetto_cmd.cc
@@ -711,6 +711,9 @@
   // TODO(eseckler): Support GetTraceStats().
 }
 
+void PerfettoCmd::OnObservableEvents(
+    const ObservableEvents& /*observable_events*/) {}
+
 int __attribute__((visibility("default")))
 PerfettoCmdMain(int argc, char** argv) {
   g_consumer_cmd = new perfetto::PerfettoCmd();
diff --git a/src/perfetto_cmd/perfetto_cmd.h b/src/perfetto_cmd/perfetto_cmd.h
index 8c6fbc7..122f042 100644
--- a/src/perfetto_cmd/perfetto_cmd.h
+++ b/src/perfetto_cmd/perfetto_cmd.h
@@ -61,6 +61,7 @@
   void OnDetach(bool) override;
   void OnAttach(bool, const TraceConfig&) override;
   void OnTraceStats(bool, const TraceStats&) override;
+  void OnObservableEvents(const ObservableEvents&) override;
 
   void SignalCtrlC() { ctrl_c_evt_.Notify(); }
 
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index b018f84..f697933 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -45,6 +45,7 @@
     "core/inode_file_config.cc",
     "core/null_trace_writer.cc",
     "core/null_trace_writer.h",
+    "core/observable_events.cc",
     "core/packet_stream_validator.cc",
     "core/packet_stream_validator.h",
     "core/patch_list.h",
diff --git a/src/tracing/api_impl/consumer_api.cc b/src/tracing/api_impl/consumer_api.cc
index 544926e..286b6b4 100644
--- a/src/tracing/api_impl/consumer_api.cc
+++ b/src/tracing/api_impl/consumer_api.cc
@@ -92,6 +92,7 @@
   void OnDetach(bool) override;
   void OnAttach(bool, const TraceConfig&) override;
   void OnTraceStats(bool, const TraceStats&) override;
+  void OnObservableEvents(const ObservableEvents&) override;
 
  private:
   TracingSession(const TracingSession&) = delete;
@@ -241,6 +242,11 @@
   PERFETTO_DCHECK(false);
 }
 
+void TracingSession::OnObservableEvents(const ObservableEvents&) {
+  // Should never be called, ObserveEvents() is not used here.
+  PERFETTO_DCHECK(false);
+}
+
 void TracingSession::DestroyConnection() {
   // Destroys the connection in a separate task. This is to avoid destroying
   // the IPC connection directly from within the IPC callback.
diff --git a/src/tracing/core/observable_events.cc b/src/tracing/core/observable_events.cc
new file mode 100644
index 0000000..9c62124
--- /dev/null
+++ b/src/tracing/core/observable_events.cc
@@ -0,0 +1,132 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*******************************************************************************
+ * AUTOGENERATED - DO NOT EDIT
+ *******************************************************************************
+ * This file has been generated from the protobuf message
+ * perfetto/common/observable_events.proto
+ * by
+ * ../../tools/proto_to_cpp/proto_to_cpp.cc.
+ * If you need to make changes here, change the .proto file and then run
+ * ./tools/gen_tracing_cpp_headers_from_protos
+ */
+
+#include "perfetto/tracing/core/observable_events.h"
+
+#include "perfetto/common/observable_events.pb.h"
+
+namespace perfetto {
+
+ObservableEvents::ObservableEvents() = default;
+ObservableEvents::~ObservableEvents() = default;
+ObservableEvents::ObservableEvents(const ObservableEvents&) = default;
+ObservableEvents& ObservableEvents::operator=(const ObservableEvents&) =
+    default;
+ObservableEvents::ObservableEvents(ObservableEvents&&) noexcept = default;
+ObservableEvents& ObservableEvents::operator=(ObservableEvents&&) = default;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wfloat-equal"
+bool ObservableEvents::operator==(const ObservableEvents& other) const {
+  return (instance_state_changes_ == other.instance_state_changes_);
+}
+#pragma GCC diagnostic pop
+
+void ObservableEvents::FromProto(
+    const perfetto::protos::ObservableEvents& proto) {
+  instance_state_changes_.clear();
+  for (const auto& field : proto.instance_state_changes()) {
+    instance_state_changes_.emplace_back();
+    instance_state_changes_.back().FromProto(field);
+  }
+  unknown_fields_ = proto.unknown_fields();
+}
+
+void ObservableEvents::ToProto(
+    perfetto::protos::ObservableEvents* proto) const {
+  proto->Clear();
+
+  for (const auto& it : instance_state_changes_) {
+    auto* entry = proto->add_instance_state_changes();
+    it.ToProto(entry);
+  }
+  *(proto->mutable_unknown_fields()) = unknown_fields_;
+}
+
+ObservableEvents::DataSourceInstanceStateChange::
+    DataSourceInstanceStateChange() = default;
+ObservableEvents::DataSourceInstanceStateChange::
+    ~DataSourceInstanceStateChange() = default;
+ObservableEvents::DataSourceInstanceStateChange::DataSourceInstanceStateChange(
+    const ObservableEvents::DataSourceInstanceStateChange&) = default;
+ObservableEvents::DataSourceInstanceStateChange&
+ObservableEvents::DataSourceInstanceStateChange::operator=(
+    const ObservableEvents::DataSourceInstanceStateChange&) = default;
+ObservableEvents::DataSourceInstanceStateChange::DataSourceInstanceStateChange(
+    ObservableEvents::DataSourceInstanceStateChange&&) noexcept = default;
+ObservableEvents::DataSourceInstanceStateChange&
+ObservableEvents::DataSourceInstanceStateChange::operator=(
+    ObservableEvents::DataSourceInstanceStateChange&&) = default;
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wfloat-equal"
+bool ObservableEvents::DataSourceInstanceStateChange::operator==(
+    const ObservableEvents::DataSourceInstanceStateChange& other) const {
+  return (producer_name_ == other.producer_name_) &&
+         (data_source_name_ == other.data_source_name_) &&
+         (state_ == other.state_);
+}
+#pragma GCC diagnostic pop
+
+void ObservableEvents::DataSourceInstanceStateChange::FromProto(
+    const perfetto::protos::ObservableEvents_DataSourceInstanceStateChange&
+        proto) {
+  static_assert(sizeof(producer_name_) == sizeof(proto.producer_name()),
+                "size mismatch");
+  producer_name_ = static_cast<decltype(producer_name_)>(proto.producer_name());
+
+  static_assert(sizeof(data_source_name_) == sizeof(proto.data_source_name()),
+                "size mismatch");
+  data_source_name_ =
+      static_cast<decltype(data_source_name_)>(proto.data_source_name());
+
+  static_assert(sizeof(state_) == sizeof(proto.state()), "size mismatch");
+  state_ = static_cast<decltype(state_)>(proto.state());
+  unknown_fields_ = proto.unknown_fields();
+}
+
+void ObservableEvents::DataSourceInstanceStateChange::ToProto(
+    perfetto::protos::ObservableEvents_DataSourceInstanceStateChange* proto)
+    const {
+  proto->Clear();
+
+  static_assert(sizeof(producer_name_) == sizeof(proto->producer_name()),
+                "size mismatch");
+  proto->set_producer_name(
+      static_cast<decltype(proto->producer_name())>(producer_name_));
+
+  static_assert(sizeof(data_source_name_) == sizeof(proto->data_source_name()),
+                "size mismatch");
+  proto->set_data_source_name(
+      static_cast<decltype(proto->data_source_name())>(data_source_name_));
+
+  static_assert(sizeof(state_) == sizeof(proto->state()), "size mismatch");
+  proto->set_state(static_cast<decltype(proto->state())>(state_));
+  *(proto->mutable_unknown_fields()) = unknown_fields_;
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/tracing_service_impl.cc b/src/tracing/core/tracing_service_impl.cc
index 5b6e35d..ba15a1b 100644
--- a/src/tracing/core/tracing_service_impl.cc
+++ b/src/tracing/core/tracing_service_impl.cc
@@ -2073,12 +2073,80 @@
   });
 }
 
+void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
+    uint32_t enabled_event_types) {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  enabled_observable_event_types_ = enabled_event_types;
+
+  if (enabled_observable_event_types_ == ObservableEventType::kNone)
+    return;
+
+  PERFETTO_DCHECK(enabled_observable_event_types_ ==
+                  ObservableEventType::kDataSourceInstances);
+
+  TracingSession* session = service_->GetTracingSession(tracing_session_id_);
+  if (!session)
+    return;
+
+  // Issue initial states
+  for (const auto& kv : session->data_source_instances) {
+    ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
+    PERFETTO_DCHECK(producer);
+    OnDataSourceInstanceStateChange(*producer, kv.second);
+  }
+}
+
+void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
+    const ProducerEndpointImpl& producer,
+    const DataSourceInstance& instance) {
+  if (!(enabled_observable_event_types_ &
+        ObservableEventType::kDataSourceInstances)) {
+    return;
+  }
+
+  if (instance.state != DataSourceInstance::CONFIGURED &&
+      instance.state != DataSourceInstance::STARTED &&
+      instance.state != DataSourceInstance::STOPPED) {
+    return;
+  }
+
+  auto* observable_events = AddObservableEvents();
+  auto* change = observable_events->add_instance_state_changes();
+  change->set_producer_name(producer.name_);
+  change->set_data_source_name(instance.data_source_name);
+  if (instance.state == DataSourceInstance::STARTED) {
+    change->set_state(ObservableEvents::DataSourceInstanceStateChange::
+                          DATA_SOURCE_INSTANCE_STATE_STARTED);
+  } else {
+    change->set_state(ObservableEvents::DataSourceInstanceStateChange::
+                          DATA_SOURCE_INSTANCE_STATE_STOPPED);
+  }
+}
+
 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
   PERFETTO_DCHECK_THREAD(thread_checker_);
   return weak_ptr_factory_.GetWeakPtr();
 }
 
+ObservableEvents*
+TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
+  PERFETTO_DCHECK_THREAD(thread_checker_);
+  if (!observable_events_) {
+    observable_events_.reset(new ObservableEvents());
+    auto weak_this = GetWeakPtr();
+    task_runner_->PostTask([weak_this] {
+      if (!weak_this)
+        return;
+
+      // Move into a temporary to allow reentrancy in OnObservableEvents.
+      auto observable_events = std::move(weak_this->observable_events_);
+      weak_this->consumer_->OnObservableEvents(*observable_events);
+    });
+  }
+  return observable_events_.get();
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // TracingServiceImpl::ProducerEndpointImpl implementation
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/tracing/core/tracing_service_impl.h b/src/tracing/core/tracing_service_impl.h
index 7346ff1..b0a9233 100644
--- a/src/tracing/core/tracing_service_impl.h
+++ b/src/tracing/core/tracing_service_impl.h
@@ -31,6 +31,7 @@
 #include "perfetto/tracing/core/basic_types.h"
 #include "perfetto/tracing/core/commit_data_request.h"
 #include "perfetto/tracing/core/data_source_descriptor.h"
+#include "perfetto/tracing/core/observable_events.h"
 #include "perfetto/tracing/core/shared_memory_abi.h"
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_stats.h"
@@ -174,21 +175,35 @@
     void Detach(const std::string& key) override;
     void Attach(const std::string& key) override;
     void GetTraceStats() override;
+    void ObserveEvents(uint32_t enabled_event_types) override;
 
-    // TODO(eseckler): Notify consumer about the state change if necessary.
+    // If |observe_data_source_instances == true|, will queue a task to notify
+    // the consumer about the state change.
     void OnDataSourceInstanceStateChange(const ProducerEndpointImpl&,
-                                         const DataSourceInstance&) {}
+                                         const DataSourceInstance&);
 
    private:
     friend class TracingServiceImpl;
     ConsumerEndpointImpl(const ConsumerEndpointImpl&) = delete;
     ConsumerEndpointImpl& operator=(const ConsumerEndpointImpl&) = delete;
 
+    // Returns a pointer to an ObservableEvents object that the caller can fill
+    // and schedules a task to send the ObservableEvents to the consumer.
+    ObservableEvents* AddObservableEvents();
+
     base::TaskRunner* const task_runner_;
     TracingServiceImpl* const service_;
     Consumer* const consumer_;
     uid_t const uid_;
     TracingSessionID tracing_session_id_ = 0;
+
+    // Whether the consumer is interested in DataSourceInstance state change
+    // events.
+    uint32_t enabled_observable_event_types_ = ObservableEventType::kNone;
+    // ObservableEvents that will be sent to the consumer. If set, a task to
+    // flush the events to the consumer has been queued.
+    std::unique_ptr<ObservableEvents> observable_events_;
+
     PERFETTO_THREAD_CHECKER(thread_checker_)
     base::WeakPtrFactory<ConsumerEndpointImpl> weak_ptr_factory_;  // Keep last.
   };
diff --git a/src/tracing/core/tracing_service_impl_unittest.cc b/src/tracing/core/tracing_service_impl_unittest.cc
index 7ea63d8..857efa6 100644
--- a/src/tracing/core/tracing_service_impl_unittest.cc
+++ b/src/tracing/core/tracing_service_impl_unittest.cc
@@ -1623,4 +1623,104 @@
   consumer->WaitForTracingDisabled();
 }
 
+TEST_F(TracingServiceImplTest, ObserveEventsDataSourceInstances) {
+  std::unique_ptr<MockConsumer> consumer = CreateMockConsumer();
+  consumer->Connect(svc.get());
+
+  std::unique_ptr<MockProducer> producer = CreateMockProducer();
+  producer->Connect(svc.get(), "mock_producer");
+  producer->RegisterDataSource("data_source");
+
+  TraceConfig trace_config;
+  trace_config.add_buffers()->set_size_kb(128);
+  auto* ds_config = trace_config.add_data_sources()->mutable_config();
+  ds_config->set_name("data_source");
+
+  // Start tracing before the consumer is interested in events. The consumer's
+  // OnObservableEvents() should not be called yet.
+  consumer->EnableTracing(trace_config);
+  producer->WaitForTracingSetup();
+  producer->WaitForDataSourceSetup("data_source");
+  producer->WaitForDataSourceStart("data_source");
+
+  // Calling ObserveEvents should cause an event for the initial instance state.
+  consumer->ObserveEvents(TracingService::ConsumerEndpoint::
+                              ObservableEventType::kDataSourceInstances);
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STARTED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  // Disabling should cause an instance state change to STOPPED.
+  consumer->DisableTracing();
+
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STOPPED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+  consumer->FreeBuffers();
+
+  // Enable again, this should cause a state change for a new instance to
+  // its initial state STOPPED.
+  trace_config.set_deferred_start(true);
+  consumer->EnableTracing(trace_config);
+
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STOPPED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  producer->WaitForDataSourceSetup("data_source");
+
+  // Should move the instance into STARTED state and thus cause an event.
+  consumer->StartTracing();
+
+  {
+    auto events = consumer->WaitForObservableEvents();
+
+    ObservableEvents::DataSourceInstanceStateChange change;
+    change.set_producer_name("mock_producer");
+    change.set_data_source_name("data_source");
+    change.set_state(ObservableEvents::DataSourceInstanceStateChange::
+                         DATA_SOURCE_INSTANCE_STATE_STARTED);
+    EXPECT_EQ(events.instance_state_changes_size(), 1);
+    EXPECT_THAT(events.instance_state_changes(), Contains(Eq(change)));
+  }
+
+  producer->WaitForDataSourceStart("data_source");
+
+  // Stop observing events.
+  consumer->ObserveEvents(
+      TracingService::ConsumerEndpoint::ObservableEventType::kNone);
+
+  // Disabling should now no longer cause events to be sent to the consumer.
+  consumer->DisableTracing();
+  producer->WaitForDataSourceStop("data_source");
+  consumer->WaitForTracingDisabled();
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
index ece76db..26f4917 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.cc
@@ -22,6 +22,7 @@
 #include "perfetto/base/task_runner.h"
 #include "perfetto/ipc/client.h"
 #include "perfetto/tracing/core/consumer.h"
+#include "perfetto/tracing/core/observable_events.h"
 #include "perfetto/tracing/core/trace_config.h"
 #include "perfetto/tracing/core/trace_stats.h"
 
@@ -279,21 +280,52 @@
 
   protos::GetTraceStatsRequest req;
   ipc::Deferred<protos::GetTraceStatsResponse> async_response;
-  auto weak_this = weak_ptr_factory_.GetWeakPtr();
 
+  // The IPC layer guarantees that callbacks are destroyed after this object
+  // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
+  // contract of this class expects the caller to not destroy the Consumer class
+  // before having destroyed this class. Hence binding |this| here is safe.
   async_response.Bind(
-      [weak_this](ipc::AsyncResult<protos::GetTraceStatsResponse> response) {
-        if (!weak_this)
-          return;
+      [this](ipc::AsyncResult<protos::GetTraceStatsResponse> response) {
         TraceStats trace_stats;
         if (!response) {
-          weak_this->consumer_->OnTraceStats(/*success=*/false, trace_stats);
+          consumer_->OnTraceStats(/*success=*/false, trace_stats);
           return;
         }
         trace_stats.FromProto(response->trace_stats());
-        weak_this->consumer_->OnTraceStats(/*success=*/true, trace_stats);
+        consumer_->OnTraceStats(/*success=*/true, trace_stats);
       });
   consumer_port_.GetTraceStats(req, std::move(async_response));
 }
 
+void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) {
+  if (!connected_) {
+    PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service");
+    return;
+  }
+
+  protos::ObserveEventsRequest req;
+  if (enabled_event_types & ObservableEventType::kDataSourceInstances) {
+    req.add_events_to_observe(
+        protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES);
+  }
+  ipc::Deferred<protos::ObserveEventsResponse> async_response;
+  // The IPC layer guarantees that callbacks are destroyed after this object
+  // is destroyed (by virtue of destroying the |consumer_port_|). In turn the
+  // contract of this class expects the caller to not destroy the Consumer class
+  // before having destroyed this class. Hence binding |this| here is safe.
+  async_response.Bind(
+      [this](ipc::AsyncResult<protos::ObserveEventsResponse> response) {
+        // Skip empty response, which the service sends to close the stream.
+        if (!response->events().instance_state_changes().size()) {
+          PERFETTO_DCHECK(!response.has_more());
+          return;
+        }
+        ObservableEvents events;
+        events.FromProto(response->events());
+        consumer_->OnObservableEvents(events);
+      });
+  consumer_port_.ObserveEvents(req, std::move(async_response));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
index 12eb030..69ab310 100644
--- a/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
+++ b/src/tracing/ipc/consumer/consumer_ipc_client_impl.h
@@ -69,6 +69,7 @@
   void Detach(const std::string& key) override;
   void Attach(const std::string& key) override;
   void GetTraceStats() override;
+  void ObserveEvents(uint32_t enabled_event_types) override;
 
   // ipc::ServiceProxy::EventListener implementation.
   // These methods are invoked by the IPC layer, which knows nothing about
diff --git a/src/tracing/ipc/service/consumer_ipc_service.cc b/src/tracing/ipc/service/consumer_ipc_service.cc
index a7f3a0d..4e5afc9 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.cc
+++ b/src/tracing/ipc/service/consumer_ipc_service.cc
@@ -158,6 +158,35 @@
   remote_consumer->service_endpoint->GetTraceStats();
 }
 
+// Called by the IPC layer.
+void ConsumerIPCService::ObserveEvents(const protos::ObserveEventsRequest& req,
+                                       DeferredObserveEventsResponse resp) {
+  RemoteConsumer* remote_consumer = GetConsumerForCurrentRequest();
+
+  // If there's a prior stream, close it so that client can clean it up.
+  remote_consumer->CloseObserveEventsResponseStream();
+
+  remote_consumer->observe_events_response = std::move(resp);
+
+  bool observe_instances = false;
+  for (const auto& type : req.events_to_observe()) {
+    switch (type) {
+      case protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES:
+        observe_instances = true;
+        break;
+      default:
+        PERFETTO_DFATAL("Unknown ObservableEvent type: %d", type);
+        break;
+    }
+  }
+  remote_consumer->service_endpoint->ObserveEvents(observe_instances);
+
+  // If no events are to be observed, close the stream immediately so that the
+  // client can clean up.
+  if (req.events_to_observe().size() == 0)
+    remote_consumer->CloseObserveEventsResponseStream();
+}
+
 // Called by the service in response to a service_endpoint->Flush() request.
 void ConsumerIPCService::OnFlushCallback(
     bool success,
@@ -279,4 +308,24 @@
   std::move(get_trace_stats_response).Resolve(std::move(response));
 }
 
+void ConsumerIPCService::RemoteConsumer::OnObservableEvents(
+    const ObservableEvents& events) {
+  if (!observe_events_response.IsBound())
+    return;
+
+  auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create();
+  result.set_has_more(true);
+  events.ToProto(result->mutable_events());
+  observe_events_response.Resolve(std::move(result));
+}
+
+void ConsumerIPCService::RemoteConsumer::CloseObserveEventsResponseStream() {
+  if (!observe_events_response.IsBound())
+    return;
+
+  auto result = ipc::AsyncResult<protos::ObserveEventsResponse>::Create();
+  result.set_has_more(false);
+  observe_events_response.Resolve(std::move(result));
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/ipc/service/consumer_ipc_service.h b/src/tracing/ipc/service/consumer_ipc_service.h
index 33e5828..2752fb3 100644
--- a/src/tracing/ipc/service/consumer_ipc_service.h
+++ b/src/tracing/ipc/service/consumer_ipc_service.h
@@ -61,6 +61,8 @@
   void Attach(const protos::AttachRequest&, DeferredAttachResponse) override;
   void GetTraceStats(const protos::GetTraceStatsRequest&,
                      DeferredGetTraceStatsResponse) override;
+  void ObserveEvents(const protos::ObserveEventsRequest&,
+                     DeferredObserveEventsResponse) override;
   void OnClientDisconnected() override;
 
  private:
@@ -81,13 +83,16 @@
     void OnDetach(bool) override;
     void OnAttach(bool, const TraceConfig&) override;
     void OnTraceStats(bool, const TraceStats&) override;
+    void OnObservableEvents(const ObservableEvents&) override;
+
+    void CloseObserveEventsResponseStream();
 
     // The interface obtained from the core service business logic through
     // TracingService::ConnectConsumer(this). This allows to invoke methods for
     // a specific Consumer on the Service business logic.
     std::unique_ptr<TracingService::ConsumerEndpoint> service_endpoint;
 
-    // After DisableTracing() is invoked, this binds the async callback that
+    // After ReadBuffers() is invoked, this binds the async callback that
     // allows to stream trace packets back to the client.
     DeferredReadBuffersResponse read_buffers_response;
 
@@ -104,6 +109,10 @@
 
     // As above, but for GetTraceStats().
     DeferredGetTraceStatsResponse get_trace_stats_response;
+
+    // After ObserveEvents() is invoked, this binds the async callback that
+    // allows to stream ObservableEvents back to the client.
+    DeferredObserveEventsResponse observe_events_response;
   };
 
   // This has to be a container that doesn't invalidate iterators.
diff --git a/src/tracing/test/mock_consumer.cc b/src/tracing/test/mock_consumer.cc
index 0e50575..15c1400 100644
--- a/src/tracing/test/mock_consumer.cc
+++ b/src/tracing/test/mock_consumer.cc
@@ -142,4 +142,23 @@
   task_runner_->RunUntilCheckpoint(checkpoint_name);
 }
 
+void MockConsumer::ObserveEvents(uint32_t enabled_event_types) {
+  service_endpoint_->ObserveEvents(enabled_event_types);
+}
+
+ObservableEvents MockConsumer::WaitForObservableEvents() {
+  ObservableEvents events;
+  static int i = 0;
+  std::string checkpoint_name = "on_observable_events_" + std::to_string(i++);
+  auto on_observable_events = task_runner_->CreateCheckpoint(checkpoint_name);
+  EXPECT_CALL(*this, OnObservableEvents(_))
+      .WillOnce(Invoke([&events, on_observable_events](
+                           const ObservableEvents& observable_events) {
+        events = observable_events;
+        on_observable_events();
+      }));
+  task_runner_->RunUntilCheckpoint(checkpoint_name);
+  return events;
+}
+
 }  // namespace perfetto
diff --git a/src/tracing/test/mock_consumer.h b/src/tracing/test/mock_consumer.h
index ef89b03..a297042 100644
--- a/src/tracing/test/mock_consumer.h
+++ b/src/tracing/test/mock_consumer.h
@@ -57,6 +57,8 @@
   std::vector<protos::TracePacket> ReadBuffers();
   void GetTraceStats();
   void WaitForTraceStats(bool success);
+  void ObserveEvents(uint32_t enabled_event_types);
+  ObservableEvents WaitForObservableEvents();
 
   TracingService::ConsumerEndpoint* endpoint() {
     return service_endpoint_.get();
@@ -71,6 +73,7 @@
   MOCK_METHOD1(OnDetach, void(bool));
   MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
   MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
+  MOCK_METHOD1(OnObservableEvents, void(const ObservableEvents&));
 
   // gtest doesn't support move-only types. This wrapper is here jut to pass
   // a pointer to the vector (rather than the vector itself) to the mock method.
diff --git a/src/tracing/test/tracing_integration_test.cc b/src/tracing/test/tracing_integration_test.cc
index d536ee3..161713c 100644
--- a/src/tracing/test/tracing_integration_test.cc
+++ b/src/tracing/test/tracing_integration_test.cc
@@ -80,6 +80,7 @@
   MOCK_METHOD1(OnDetach, void(bool));
   MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
   MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
+  MOCK_METHOD1(OnObservableEvents, void(const ObservableEvents&));
 
   // Workaround, gmock doesn't support yet move-only types, passing a pointer.
   void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
diff --git a/test/test_helper.cc b/test/test_helper.cc
index 2111c77..2d5b7e2 100644
--- a/test/test_helper.cc
+++ b/test/test_helper.cc
@@ -179,6 +179,8 @@
 
 void TestHelper::OnTraceStats(bool, const TraceStats&) {}
 
+void TestHelper::OnObservableEvents(const ObservableEvents&) {}
+
 // static
 const char* TestHelper::GetConsumerSocketName() {
   return TEST_CONSUMER_SOCK_NAME;
diff --git a/test/test_helper.h b/test/test_helper.h
index 21caa66..083a529 100644
--- a/test/test_helper.h
+++ b/test/test_helper.h
@@ -44,6 +44,7 @@
   void OnDetach(bool) override;
   void OnAttach(bool, const TraceConfig&) override;
   void OnTraceStats(bool, const TraceStats&) override;
+  void OnObservableEvents(const ObservableEvents&) override;
 
   void StartServiceIfRequired();
   FakeProducer* ConnectFakeProducer();
diff --git a/tools/gen_tracing_cpp_headers_from_protos b/tools/gen_tracing_cpp_headers_from_protos
index 8c14b6f..7728949 100755
--- a/tools/gen_tracing_cpp_headers_from_protos
+++ b/tools/gen_tracing_cpp_headers_from_protos
@@ -20,6 +20,7 @@
 PROTOS = (
   'perfetto/common/android_log_constants.proto',
   'perfetto/common/commit_data_request.proto',
+  'perfetto/common/observable_events.proto',
   'perfetto/common/sys_stats_counters.proto',
   'perfetto/common/trace_stats.proto',
   'perfetto/config/android/android_log_config.proto',