Added dynamic tables for preceding and following flows

Following/Preceding tables contain all entries of flow events table
that directly or indirectly connected to the starting slice.


FOLLOWING_FLOW(start_slice_id) - all entries of flow events table that
have flow.slice_out=start_slice_id or there is a chain of slice_ids:
start_slice_id -> slice_id_1 -> ... slice_id_n -> flow.slice_in
and each pair of neighbour slices in this chain has a flow between them

PRECEDING_FLOW(start_slice_id) - the same as FOLLOWING_FLOW but in
opposite direction.

CONNECTED_FLOW(start_slice_id) - union if following and preceding tables

Bug:153137035
Change-Id: I1c464ba35a360004b5d3c4dfd83cc31f27107ac4
diff --git a/Android.bp b/Android.bp
index 995769c..5aab3f2 100644
--- a/Android.bp
+++ b/Android.bp
@@ -6825,6 +6825,7 @@
   name: "perfetto_src_trace_processor_lib",
   srcs: [
     "src/trace_processor/dynamic/ancestor_slice_generator.cc",
+    "src/trace_processor/dynamic/connected_flow_generator.cc",
     "src/trace_processor/dynamic/descendant_slice_generator.cc",
     "src/trace_processor/dynamic/describe_slice_generator.cc",
     "src/trace_processor/dynamic/experimental_counter_dur_generator.cc",
diff --git a/BUILD b/BUILD
index 9839996..6df6e49 100644
--- a/BUILD
+++ b/BUILD
@@ -972,6 +972,8 @@
     srcs = [
         "src/trace_processor/dynamic/ancestor_slice_generator.cc",
         "src/trace_processor/dynamic/ancestor_slice_generator.h",
+        "src/trace_processor/dynamic/connected_flow_generator.cc",
+        "src/trace_processor/dynamic/connected_flow_generator.h",
         "src/trace_processor/dynamic/descendant_slice_generator.cc",
         "src/trace_processor/dynamic/descendant_slice_generator.h",
         "src/trace_processor/dynamic/describe_slice_generator.cc",
diff --git a/docs/analysis/trace-processor.md b/docs/analysis/trace-processor.md
index 6d28c2f..d054103 100644
--- a/docs/analysis/trace-processor.md
+++ b/docs/analysis/trace-processor.md
@@ -347,6 +347,31 @@
 FROM interesting_slices
 ```
 
+### Following/Preceding/Connected flows
+following_flow, preceding_flow, connected_flow are custom operator tables that
+take a [slice table's id column](/docs/analysis/sql-tables.autogen#slice) and
+collect all entries of [flow table](/docs/analysis/sql-tables.autogen#flow),
+that are directly or indirectly connected to the given starting slice.
+
+`FOLLOWING_FLOW(start_slice_id)` - contains all entries of
+[flow table](/docs/analysis/sql-tables.autogen#flow)
+that are present in any chain of kind: `flow[0] -> flow[1] -> ... -> flow[n]`,
+where `flow[i].slice_out = flow[i+1].slice_in` and
+`flow[0].slice_out = start_slice_id`.
+
+`PRECEDING_FLOW(start_slice_id)` - contains all entries of
+[flow table](/docs/analysis/sql-tables.autogen#flow)
+that are present in any chain of kind: `flow[n] -> flow[n-1] -> ... -> flow[0]`,
+where `flow[i].slice_in = flow[i+1].slice_out` and
+`flow[0].slice_in = start_slice_id`.
+
+`CONNECTED_FLOW(start_slice_id)` - contains a union of both
+`FOLLOWING_FLOW(start_slice_id)` and `PRECEDING_FLOW(start_slice_id)` tables.
+
+```sql
+--number of following flows for each slice
+SELECT (SELECT COUNT(*) FROM FOLLOWING_FLOW(slice_id)) as following FROM slice;
+```
 
 ## Metrics
 
diff --git a/src/trace_processor/BUILD.gn b/src/trace_processor/BUILD.gn
index 5270204..f86b6db 100644
--- a/src/trace_processor/BUILD.gn
+++ b/src/trace_processor/BUILD.gn
@@ -284,6 +284,8 @@
     sources = [
       "dynamic/ancestor_slice_generator.cc",
       "dynamic/ancestor_slice_generator.h",
+      "dynamic/connected_flow_generator.cc",
+      "dynamic/connected_flow_generator.h",
       "dynamic/descendant_slice_generator.cc",
       "dynamic/descendant_slice_generator.h",
       "dynamic/describe_slice_generator.cc",
diff --git a/src/trace_processor/dynamic/connected_flow_generator.cc b/src/trace_processor/dynamic/connected_flow_generator.cc
new file mode 100644
index 0000000..d2e0a9e
--- /dev/null
+++ b/src/trace_processor/dynamic/connected_flow_generator.cc
@@ -0,0 +1,157 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/trace_processor/dynamic/connected_flow_generator.h"
+
+#include <memory>
+#include <queue>
+#include <set>
+
+#include "src/trace_processor/types/trace_processor_context.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+ConnectedFlowGenerator::ConnectedFlowGenerator(Direction direction,
+                                               TraceProcessorContext* context)
+    : context_(context), direction_(direction) {}
+
+ConnectedFlowGenerator::~ConnectedFlowGenerator() = default;
+
+util::Status ConnectedFlowGenerator::ValidateConstraints(
+    const QueryConstraints& qc) {
+  const auto& cs = qc.constraints();
+
+  auto flow_id_fn = [this](const QueryConstraints::Constraint& c) {
+    return c.column == static_cast<int>(
+                           context_->storage->flow_table().GetColumnCount()) &&
+           c.op == SQLITE_INDEX_CONSTRAINT_EQ;
+  };
+  bool has_flow_id_cs =
+      std::find_if(cs.begin(), cs.end(), flow_id_fn) != cs.end();
+
+  return has_flow_id_cs
+             ? util::OkStatus()
+             : util::ErrStatus("Failed to find required constraints");
+}
+
+std::vector<uint32_t> ConnectedFlowGenerator::GetConnectedFlowRows(
+    SliceId start_id,
+    Direction dir) {
+  PERFETTO_DCHECK(dir != Direction::BOTH);
+  std::vector<uint32_t> result_rows;
+
+  // TODO: add hash function for SliceId and change this to unordered_set
+  std::set<SliceId> visited_slice_ids;
+  std::queue<SliceId> slice_id_queue({start_id});
+
+  const auto& flow = context_->storage->flow_table();
+  const TypedColumn<SliceId>& start_col =
+      (dir == Direction::FOLLOWING ? flow.slice_out() : flow.slice_in());
+  const TypedColumn<SliceId>& end_col =
+      (dir == Direction::FOLLOWING ? flow.slice_in() : flow.slice_out());
+
+  while (!slice_id_queue.empty()) {
+    SliceId current_slice_id = slice_id_queue.front();
+    slice_id_queue.pop();
+    auto rows = flow.FilterToRowMap({start_col.eq(current_slice_id.value)});
+    for (auto row_it = rows.IterateRows(); row_it; row_it.Next()) {
+      SliceId next_slice_id = end_col[row_it.row()];
+      if (visited_slice_ids.find(next_slice_id) != visited_slice_ids.end()) {
+        continue;
+      }
+
+      visited_slice_ids.insert(next_slice_id);
+      slice_id_queue.push(next_slice_id);
+      result_rows.push_back(row_it.row());
+    }
+  }
+
+  return result_rows;
+}
+
+std::unique_ptr<Table> ConnectedFlowGenerator::ComputeTable(
+    const std::vector<Constraint>& cs,
+    const std::vector<Order>&) {
+  const auto& flow = context_->storage->flow_table();
+  const auto& slice = context_->storage->slice_table();
+
+  auto it = std::find_if(cs.begin(), cs.end(), [&flow](const Constraint& c) {
+    return c.col_idx == flow.GetColumnCount() && c.op == FilterOp::kEq;
+  });
+
+  PERFETTO_DCHECK(it != cs.end());
+
+  SliceId start_id{static_cast<uint32_t>(it->value.AsLong())};
+
+  if (!slice.id().IndexOf(start_id)) {
+    PERFETTO_ELOG("Given slice id is invalid (ConnectedFlowGenerator)");
+    return nullptr;
+  }
+
+  std::vector<uint32_t> result_rows;
+
+  if (direction_ != Direction::PRECEDING) {
+    // FOLLOWING or ALL_CONNECTED
+    auto rows = GetConnectedFlowRows(start_id, Direction::FOLLOWING);
+    result_rows.insert(result_rows.begin(), rows.begin(), rows.end());
+  }
+  if (direction_ != Direction::FOLLOWING) {
+    // PRECEDING or ALL_CONNECTED
+    auto rows = GetConnectedFlowRows(start_id, Direction::PRECEDING);
+    result_rows.insert(result_rows.begin(), rows.begin(), rows.end());
+  }
+
+  // Aditional column for start_id
+  std::unique_ptr<NullableVector<uint32_t>> start_ids(
+      new NullableVector<uint32_t>());
+
+  for (size_t i = 0; i < result_rows.size(); i++) {
+    start_ids->Append(start_id.value);
+  }
+
+  return std::unique_ptr<Table>(
+      new Table(flow.Apply(RowMap(std::move(result_rows)))
+                    .ExtendWithColumn("start_id", std::move(start_ids),
+                                      TypedColumn<uint32_t>::default_flags() |
+                                          TypedColumn<uint32_t>::kHidden)));
+}
+
+Table::Schema ConnectedFlowGenerator::CreateSchema() {
+  auto schema = tables::FlowTable::Schema();
+  schema.columns.push_back(Table::Schema::Column{
+      "start_id", SqlValue::Type::kLong, /* is_id = */ false,
+      /* is_sorted = */ false, /* is_hidden = */ true});
+  return schema;
+}
+
+std::string ConnectedFlowGenerator::TableName() {
+  switch (direction_) {
+    case Direction::BOTH:
+      return "connected_flow";
+    case Direction::FOLLOWING:
+      return "following_flow";
+    case Direction::PRECEDING:
+      return "preceding_flow";
+  }
+  PERFETTO_FATAL("Unexpected ConnectedFlowType");
+}
+
+uint32_t ConnectedFlowGenerator::EstimateRowCount() {
+  return 1;
+}
+}  // namespace trace_processor
+}  // namespace perfetto
diff --git a/src/trace_processor/dynamic/connected_flow_generator.h b/src/trace_processor/dynamic/connected_flow_generator.h
new file mode 100644
index 0000000..27715e1
--- /dev/null
+++ b/src/trace_processor/dynamic/connected_flow_generator.h
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef SRC_TRACE_PROCESSOR_DYNAMIC_CONNECTED_FLOW_GENERATOR_H_
+#define SRC_TRACE_PROCESSOR_DYNAMIC_CONNECTED_FLOW_GENERATOR_H_
+
+#include "src/trace_processor/sqlite/db_sqlite_table.h"
+
+#include "src/trace_processor/storage/trace_storage.h"
+
+namespace perfetto {
+namespace trace_processor {
+
+class TraceProcessorContext;
+
+// Implementation of tables: CONNECTED_FLOW, FOLLOWING_FLOW, PERCEEDING_FLOW
+// Searches for all entries of flow events table that are dirrectly or
+// indirectly connected to the given slice (slice id). It is possible to
+// restrict the direction of search.
+class ConnectedFlowGenerator : public DbSqliteTable::DynamicTableGenerator {
+ public:
+  enum class Direction { BOTH = 0, FOLLOWING = 1, PRECEDING = 2 };
+
+  explicit ConnectedFlowGenerator(Direction type,
+                                  TraceProcessorContext* context);
+  ~ConnectedFlowGenerator() override;
+
+  Table::Schema CreateSchema() override;
+  std::string TableName() override;
+  uint32_t EstimateRowCount() override;
+  util::Status ValidateConstraints(const QueryConstraints&) override;
+  std::unique_ptr<Table> ComputeTable(const std::vector<Constraint>& cs,
+                                      const std::vector<Order>& ob) override;
+
+ private:
+  // This function runs BFS on the flow events table as on directed graph
+  // It starts from start_id slice and returns all flow rows that are
+  // directly or indirectly connected to the starting slice.
+  // If dir is FOLLOWING BFS will move in direction (slice_out -> slice_in)
+  // If dir is PRECEDING BFS will move in direction (slice_in -> slice_out)
+  // IMPORTANT: dir must not be set to BOTH for this method
+  std::vector<uint32_t> GetConnectedFlowRows(SliceId start_id, Direction dir);
+
+  TraceProcessorContext* context_ = nullptr;
+  Direction direction_;
+};
+
+}  // namespace trace_processor
+}  // namespace perfetto
+
+#endif  // SRC_TRACE_PROCESSOR_DYNAMIC_CONNECTED_FLOW_GENERATOR_H_
diff --git a/src/trace_processor/trace_processor_impl.cc b/src/trace_processor/trace_processor_impl.cc
index d4f55d0..894e6f3 100644
--- a/src/trace_processor/trace_processor_impl.cc
+++ b/src/trace_processor/trace_processor_impl.cc
@@ -24,6 +24,7 @@
 #include "perfetto/ext/base/string_splitter.h"
 #include "perfetto/ext/base/string_utils.h"
 #include "src/trace_processor/dynamic/ancestor_slice_generator.h"
+#include "src/trace_processor/dynamic/connected_flow_generator.h"
 #include "src/trace_processor/dynamic/descendant_slice_generator.h"
 #include "src/trace_processor/dynamic/describe_slice_generator.h"
 #include "src/trace_processor/dynamic/experimental_counter_dur_generator.h"
@@ -731,6 +732,15 @@
       new AncestorSliceGenerator(&context_)));
   RegisterDynamicTable(std::unique_ptr<DescendantSliceGenerator>(
       new DescendantSliceGenerator(&context_)));
+  RegisterDynamicTable(
+      std::unique_ptr<ConnectedFlowGenerator>(new ConnectedFlowGenerator(
+          ConnectedFlowGenerator::Direction::BOTH, &context_)));
+  RegisterDynamicTable(
+      std::unique_ptr<ConnectedFlowGenerator>(new ConnectedFlowGenerator(
+          ConnectedFlowGenerator::Direction::FOLLOWING, &context_)));
+  RegisterDynamicTable(
+      std::unique_ptr<ConnectedFlowGenerator>(new ConnectedFlowGenerator(
+          ConnectedFlowGenerator::Direction::PRECEDING, &context_)));
   RegisterDynamicTable(std::unique_ptr<ExperimentalSchedUpidGenerator>(
       new ExperimentalSchedUpidGenerator(storage->sched_slice_table(),
                                          storage->thread_table())));
diff --git a/test/trace_processor/dynamic/connected_flow.out b/test/trace_processor/dynamic/connected_flow.out
new file mode 100644
index 0000000..8c64ff3
--- /dev/null
+++ b/test/trace_processor/dynamic/connected_flow.out
@@ -0,0 +1,49 @@
+"type","name","start_name","end_name"
+"connected","Flow0_Slice0","Flow0_Slice0","Flow0_Slice1"
+"connected","Flow0_Slice1","Flow0_Slice0","Flow0_Slice1"
+"connected","Flow1_Slice0","Flow1_Slice0","Flow1_Slice1a"
+"connected","Flow1_Slice0","Flow1_Slice0","Flow1_Slice1b"
+"connected","Flow1_Slice1a","Flow1_Slice0","Flow1_Slice1a"
+"connected","Flow1_Slice1b","Flow1_Slice0","Flow1_Slice1b"
+"connected","Flow2_Slice0","Flow2_Slice0","Flow2_Slice1"
+"connected","Flow2_Slice0","Flow2_Slice1","Flow2_Slice2"
+"connected","Flow2_Slice0","Flow2_Slice2","Flow2_Slice3a"
+"connected","Flow2_Slice0","Flow2_Slice2","Flow2_Slice3b"
+"connected","Flow2_Slice1","Flow2_Slice0","Flow2_Slice1"
+"connected","Flow2_Slice1","Flow2_Slice1","Flow2_Slice2"
+"connected","Flow2_Slice1","Flow2_Slice2","Flow2_Slice3a"
+"connected","Flow2_Slice1","Flow2_Slice2","Flow2_Slice3b"
+"connected","Flow2_Slice2","Flow2_Slice0","Flow2_Slice1"
+"connected","Flow2_Slice2","Flow2_Slice1","Flow2_Slice2"
+"connected","Flow2_Slice2","Flow2_Slice2","Flow2_Slice3a"
+"connected","Flow2_Slice2","Flow2_Slice2","Flow2_Slice3b"
+"connected","Flow2_Slice3a","Flow2_Slice0","Flow2_Slice1"
+"connected","Flow2_Slice3a","Flow2_Slice1","Flow2_Slice2"
+"connected","Flow2_Slice3a","Flow2_Slice2","Flow2_Slice3a"
+"connected","Flow2_Slice3b","Flow2_Slice0","Flow2_Slice1"
+"connected","Flow2_Slice3b","Flow2_Slice1","Flow2_Slice2"
+"connected","Flow2_Slice3b","Flow2_Slice2","Flow2_Slice3b"
+"following","Flow0_Slice0","Flow0_Slice0","Flow0_Slice1"
+"following","Flow1_Slice0","Flow1_Slice0","Flow1_Slice1a"
+"following","Flow1_Slice0","Flow1_Slice0","Flow1_Slice1b"
+"following","Flow2_Slice0","Flow2_Slice0","Flow2_Slice1"
+"following","Flow2_Slice0","Flow2_Slice1","Flow2_Slice2"
+"following","Flow2_Slice0","Flow2_Slice2","Flow2_Slice3a"
+"following","Flow2_Slice0","Flow2_Slice2","Flow2_Slice3b"
+"following","Flow2_Slice1","Flow2_Slice1","Flow2_Slice2"
+"following","Flow2_Slice1","Flow2_Slice2","Flow2_Slice3a"
+"following","Flow2_Slice1","Flow2_Slice2","Flow2_Slice3b"
+"following","Flow2_Slice2","Flow2_Slice2","Flow2_Slice3a"
+"following","Flow2_Slice2","Flow2_Slice2","Flow2_Slice3b"
+"preceding","Flow0_Slice1","Flow0_Slice0","Flow0_Slice1"
+"preceding","Flow1_Slice1a","Flow1_Slice0","Flow1_Slice1a"
+"preceding","Flow1_Slice1b","Flow1_Slice0","Flow1_Slice1b"
+"preceding","Flow2_Slice1","Flow2_Slice0","Flow2_Slice1"
+"preceding","Flow2_Slice2","Flow2_Slice0","Flow2_Slice1"
+"preceding","Flow2_Slice2","Flow2_Slice1","Flow2_Slice2"
+"preceding","Flow2_Slice3a","Flow2_Slice0","Flow2_Slice1"
+"preceding","Flow2_Slice3a","Flow2_Slice1","Flow2_Slice2"
+"preceding","Flow2_Slice3a","Flow2_Slice2","Flow2_Slice3a"
+"preceding","Flow2_Slice3b","Flow2_Slice0","Flow2_Slice1"
+"preceding","Flow2_Slice3b","Flow2_Slice1","Flow2_Slice2"
+"preceding","Flow2_Slice3b","Flow2_Slice2","Flow2_Slice3b"
diff --git a/test/trace_processor/dynamic/connected_flow.sql b/test/trace_processor/dynamic/connected_flow.sql
new file mode 100644
index 0000000..22e0692
--- /dev/null
+++ b/test/trace_processor/dynamic/connected_flow.sql
@@ -0,0 +1,30 @@
+--
+-- Copyright 2020 The Android Open Source Project
+--
+-- Licensed under the Apache License, Version 2.0 (the "License");
+-- you may not use this file except in compliance with the License.
+-- You may obtain a copy of the License at
+--
+--     https://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+SELECT "connected" as type, s.name, s1.name as start_name, s2.name as end_name  FROM slice s
+JOIN CONNECTED_FLOW(s.id) c
+JOIN slice s1 ON s1.id = c.slice_out
+JOIN slice s2 ON s2.id = c.slice_in
+UNION
+SELECT "following" as type, s.name, s1.name as start_name, s2.name as end_name  FROM slice s
+JOIN FOLLOWING_FLOW(s.id) c
+JOIN slice s1 ON s1.id = c.slice_out
+JOIN slice s2 ON s2.id = c.slice_in
+UNION
+SELECT "preceding" as type, s.name, s1.name as start_name, s2.name as end_name  FROM slice s
+JOIN PRECEDING_FLOW(s.id) c
+JOIN slice s1 ON s1.id = c.slice_out
+JOIN slice s2 ON s2.id = c.slice_in
+ORDER BY type, s.name, s1.name, s2.name ASC
diff --git a/test/trace_processor/dynamic/connected_flow_data.json b/test/trace_processor/dynamic/connected_flow_data.json
new file mode 100644
index 0000000..ed748bc
--- /dev/null
+++ b/test/trace_processor/dynamic/connected_flow_data.json
@@ -0,0 +1,128 @@
+{
+    "traceEvents": [
+      {"args":{"name":"MainProcess"},"cat":"__metadata","name":"process_name","ph":"M","pid":100,"tid":0,"ts":0},
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15903,
+        "ts": 420,
+        "ph": "X",
+        "name": "Flow0_Slice0",
+        "args":{},
+        "dur": 150,
+        "bind_id": "0x401",
+        "flow_out": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15895,
+        "ts": 720,
+        "ph": "X",
+        "name": "Flow0_Slice1",
+        "args":{},
+        "dur": 200,
+        "bind_id": "0x401",
+        "flow_in": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15904,
+        "ts": 400,
+        "ph": "X",
+        "name": "Flow1_Slice0",
+        "args":{},
+        "dur": 600,
+        "bind_id": "0x403",
+        "flow_out": true,
+        "flow_in": false
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15903,
+        "ts": 1300,
+        "ph": "X",
+        "name": "Flow1_Slice1a",
+        "args":{},
+        "dur": 600,
+        "bind_id": "0x403",
+        "flow_in": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15794,
+        "ts": 1100,
+        "ph": "X",
+        "name": "Flow1_Slice1b",
+        "args":{},
+        "dur": 900,
+        "bind_id": "0x403",
+        "flow_in": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15903,
+        "ts": 1351,
+        "ph": "X",
+        "name": "Flow2_Slice0",
+        "args": {},
+        "dur": 500,
+        "bind_id": "0x402",
+        "flow_out": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15903,
+        "ts": 2002,
+        "ph": "X",
+        "name": "Flow2_Slice1",
+        "args":{},
+        "dur": 350,
+        "bind_id": "0x402",
+        "flow_out": true,
+        "flow_in": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15794,
+        "ts": 2400,
+        "ph": "X",
+        "name": "Flow2_Slice2",
+        "args": {},
+        "dur": 300,
+        "bind_id": "0x402",
+        "flow_in": true,
+        "flow_out": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15904,
+        "ts": 3031,
+        "ph": "X",
+        "name": "Flow2_Slice3a",
+        "args": {},
+        "dur": 400,
+        "bind_id": "0x402",
+        "flow_in": true
+      },
+      {
+        "cat": "ipc",
+        "pid": 100,
+        "tid": 15895,
+        "ts": 3333,
+        "ph": "X",
+        "name": "Flow2_Slice3b",
+        "args": {},
+        "dur": 400,
+        "bind_id": "0x402",
+        "flow_in": true
+      }
+    ]
+  }
\ No newline at end of file
diff --git a/test/trace_processor/dynamic/index b/test/trace_processor/dynamic/index
index 10f8253..8e76690 100644
--- a/test/trace_processor/dynamic/index
+++ b/test/trace_processor/dynamic/index
@@ -4,4 +4,7 @@
 relationship_tables.textproto ancestor_slice.sql ancestor_slice.out
 
 # Descendant slice table.
-relationship_tables.textproto descendant_slice.sql descendant_slice.out
\ No newline at end of file
+relationship_tables.textproto descendant_slice.sql descendant_slice.out
+
+# Connected/Following/Perceeding flow table.
+connected_flow_data.json connected_flow.sql connected_flow.out
\ No newline at end of file