Reset ProbesProducer's state if losing service connection

Fixes a lifetime bug in traced_probes. The bug is the following:
when we lose the connection to the service we reset the IPC
|endpoint_| and re-attempt the connection. However the |endpoint_|
is the object that owns the shared memory buffer, on which the
various TraceWriter instances hold onto. We cannot destroy the
|endpoint_| without guaranteeing that all the TraceWriter(s) have
been destroyed as well.
This CL achieves this by simply invoking the destructor (which in
turn will destroy all the SinkDelegates and the various controllers)
and then back the constructor, starting with a brand new instance.
This, of course, relies on destruction order to be correct. But at
least doesn't introduce a secondary teardown path.

Bug: 77282805
Test: see repro steps of b/77282805.
Change-Id: Ieec7ab934c796222fea15365ee7c98d2db75cd6d
diff --git a/src/traced/probes/probes_producer.cc b/src/traced/probes/probes_producer.cc
index 3c3a532..5dc47dc 100644
--- a/src/traced/probes/probes_producer.cc
+++ b/src/traced/probes/probes_producer.cc
@@ -80,15 +80,33 @@
 
 void ProbesProducer::OnDisconnect() {
   PERFETTO_DCHECK(state_ == kConnected || state_ == kConnecting);
-  state_ = kNotConnected;
   PERFETTO_LOG("Disconnected from tracing service");
-  IncreaseConnectionBackoff();
+  if (state_ == kConnected)
+    return task_runner_->PostTask([this] { this->Restart(); });
 
-  // TODO(hjd): Erase all sinks and add e2e test for this.
+  state_ = kNotConnected;
+  IncreaseConnectionBackoff();
   task_runner_->PostDelayedTask([this] { this->Connect(); },
                                 connection_backoff_ms_);
 }
 
+void ProbesProducer::Restart() {
+  // We lost the connection with the tracing service. At this point we need
+  // to reset all the data sources. Trying to handle that manually is going to
+  // be error prone. What we do here is simply desroying the instance and
+  // recreating it again.
+  // TODO(hjd): Add e2e test for this.
+
+  base::TaskRunner* task_runner = task_runner_;
+  const char* socket_name = socket_name_;
+
+  // Invoke destructor and then the constructor again.
+  this->~ProbesProducer();
+  new (this) ProbesProducer();
+
+  ConnectWithRetries(socket_name, task_runner);
+}
+
 void ProbesProducer::CreateDataSourceInstance(DataSourceInstanceID instance_id,
                                               const DataSourceConfig& config) {
   // TODO(hjd): This a hack since we don't actually know the session id. For