Shared Memory Buffer: Binary Format

This CL introduces the core definitions of the share buffer
binary interface and its accessor methods.
This ABI is supposed to backwards compatible as
previous versions of the clients will rely on it.

Bug: 70284518,68854243
Test: perfetto_tests --gtest_filter=SharedMemoryABITest.*

Change-Id: I64d44757dd36783ef6c8c097cb479723cb7970fc
diff --git a/Android.bp b/Android.bp
index 9ef2dcf..1379d31 100644
--- a/Android.bp
+++ b/Android.bp
@@ -386,6 +386,8 @@
     "src/tracing/core/id_allocator_unittest.cc",
     "src/tracing/core/service_impl.cc",
     "src/tracing/core/service_impl_unittest.cc",
+    "src/tracing/core/shared_memory_abi.cc",
+    "src/tracing/core/shared_memory_abi_unittest.cc",
     "src/tracing/core/trace_config.cc",
     "src/tracing/core/trace_packet.cc",
     "src/tracing/core/trace_packet_unittest.cc",
diff --git a/include/perfetto/tracing/core/BUILD.gn b/include/perfetto/tracing/core/BUILD.gn
index cadc9f0..8d83c55 100644
--- a/include/perfetto/tracing/core/BUILD.gn
+++ b/include/perfetto/tracing/core/BUILD.gn
@@ -25,6 +25,7 @@
     "producer.h",
     "service.h",
     "shared_memory.h",
+    "shared_memory_abi.h",
     "trace_config.h",
     "trace_packet.h",
   ]
diff --git a/include/perfetto/tracing/core/basic_types.h b/include/perfetto/tracing/core/basic_types.h
index 2966d11..6b2a3a0 100644
--- a/include/perfetto/tracing/core/basic_types.h
+++ b/include/perfetto/tracing/core/basic_types.h
@@ -24,6 +24,11 @@
 using ProducerID = uint64_t;
 using DataSourceID = uint64_t;
 using DataSourceInstanceID = uint64_t;
+using WriterID = uint16_t;
+using BufferID = uint16_t;
+
+// Keep this in sync with SharedMemoryABI::PageHeader::target_buffer.
+static constexpr size_t kMaxTraceBuffers = 1ul << 16;
 
 }  // namespace perfetto
 
diff --git a/include/perfetto/tracing/core/shared_memory_abi.h b/include/perfetto/tracing/core/shared_memory_abi.h
new file mode 100644
index 0000000..5650291
--- /dev/null
+++ b/include/perfetto/tracing/core/shared_memory_abi.h
@@ -0,0 +1,542 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef INCLUDE_PERFETTO_TRACING_CORE_SHARED_MEMORY_ABI_H_
+#define INCLUDE_PERFETTO_TRACING_CORE_SHARED_MEMORY_ABI_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <array>
+#include <atomic>
+#include <bitset>
+#include <thread>
+#include <type_traits>
+#include <utility>
+
+#include "perfetto/base/logging.h"
+
+namespace perfetto {
+
+// This file defines the binary interface of the memory buffers shared between
+// Producer and Service. This is a long-term stable ABI and has to be backwards
+// compatible to deal with mismatching Producer and Service versions.
+//
+// Overview
+// --------
+// SMB := "Shared Memory Buffer".
+// In the most typical case of a multi-process architecture (i.e. Producer and
+// Service are hosted by different processes), a Producer means almost always
+// a "client process producing data" (almost: in some cases a process might host
+// > 1 Producer, if it links two libraries, independent of each other, that both
+// use Perfetto tracing).
+// The Service has one SMB for each Producer.
+// A producer has one or (typically) more data sources. They all share the same
+// SMB.
+// The SMB is a staging area to decouple data sources living in the Producer
+// and allow them to do non-blocking async writes.
+// The SMB is *not* the ultimate logging buffer seen by the Consumer. That one
+// is larger (~MBs) and not shared with Producers.
+// Each SMB is small, typically few KB. Its size is configurable by the producer
+// within a max limit of ~MB (see kMaxShmSize in service_impl.cc).
+// The SMB is partitioned into fixed-size Page(s). The size of the Pages are
+// determined by each Producer at connection time and cannot be changed.
+// Hence, different producers can have SMB(s) that have a different Page size
+// from each other, but the page size will be constant throughout all the
+// lifetime of the SMB.
+// Page(s) are partitioned by the Producer into variable size Chunk(s):
+//
+// +------------+      +--------------------------+
+// | Producer 1 |  <-> |      SMB 1 [~32K - 1MB]  |
+// +------------+      +--------+--------+--------+
+//                     |  Page  |  Page  |  Page  |
+//                     +--------+--------+--------+
+//                     | Chunk  |        | Chunk  |
+//                     +--------+  Chunk +--------+ <----+
+//                     | Chunk  |        | Chunk  |      |
+//                     +--------+--------+--------+      +---------------------+
+//                                                       |       Service       |
+// +------------+      +--------------------------+      +---------------------+
+// | Producer 2 |  <-> |      SMB 2 [~32K - 1MB]  |     /| large ring buffers  |
+// +------------+      +--------+--------+--------+ <--+ | (100K - several MB) |
+//                     |  Page  |  Page  |  Page  |      +---------------------+
+//                     +--------+--------+--------+
+//                     | Chunk  |        | Chunk  |
+//                     +--------+  Chunk +--------+
+//                     | Chunk  |        | Chunk  |
+//                     +--------+--------+--------+
+//
+// * Sizes of both SMB and ring buffers are purely indicative and decided at
+// configuration time by the Producer (for SMB sizes) and the Consumer (for the
+// final ring buffer size).
+
+// Page
+// ----
+// A page is a portion of the shared memory buffer and defines the granularity
+// of the interaction between the Producer and tracing Service. When scanning
+// the shared memory buffer to determine if something should be moved to the
+// central logging buffers, the Service most of the times looks at and moves
+// whole pages. Similarly, the Producer sends an IPC to invite the Service to
+// drain the shared memory buffer only when a whole page is filled.
+// Having fixed the total SMB size (hence the total memory overhead), the page
+// size is a triangular tradeoff between:
+// 1) IPC traffic: smaller pages -> more IPCs.
+// 2) Producer lock freedom: larger pages -> larger chunks -> data sources can
+//    write more data without needing to swap chunks and synchronize.
+// 3) Risk of write-starving the SMB: larger pages -> higher chance that the
+//    Service won't manage to drain them and the SMB remains full.
+// The page size, on the other side, has no implications on wasted memory due to
+// fragmentations (see Chunk below).
+// The size of the page is chosen by the Service at connection time and stays
+// fixed throughout all the lifetime of the Producer. Different producers (i.e.
+// ~ different client processes) can use different page sizes.
+// The page size must be an integer multiple of 4k (this is to allow VM page
+// stealing optimizations) and obviously has to be an integer divisor of the
+// total SMB size.
+
+// Chunk
+// -----
+// A chunk is a portion of a Page which is written and handled by a Producer.
+// A chunk contains a linear sequence of TracePacket(s) (the root proto).
+// A chunk cannot be written concurrently by two data sources. Protobufs must be
+// encoded as contiguous byte streams and cannot be interleaved. Therefore, on
+// the Producer side, a chunk is almost always owned exclusively by one thread
+// (% extremely peculiar slow-path cases).
+// Chunks are essentially single-writer single-thread lock-free arenas. Locking
+// happens only when a Chunk is full and a new one needs to be acquired.
+// Locking happens only within the scope of a Producer process. There is no
+// inter-process locking. The Producer cannot lock the Service and viceversa.
+// In the worst case, any of the two can starve the SMB, by marking all chunks
+// as either being read or written. But that has the only side effect of
+// losing the trace data.
+// The Producer can decide to partition each page into a number of limited
+// configurations (e.g., 1 page == 1 chunk, 1 page == 2 chunks and so on).
+
+// TracePacket
+// -----------
+// Is the atom of tracing. Putting aside pages and chunks a trace is merely a
+// sequence of TracePacket(s). TracePacket is the root protobuf message.
+// A TracePacket can span across several chunks (hence even across several
+// pages). A TracePacket can therefore be >> chunk size, >> page size and even
+// >> SMB size. The Chunk header carries metadata to deal with the TracePacket
+// splitting case.
+
+// Use only explicitly-sized types below. DO NOT use size_t or any architecture
+// dependent size (e.g. size_t) in the struct fields. This buffer will be read
+// and written by processes that have a different bitness in the same OS.
+// Instead it's fine to assume little-endianess. Big-endian is a dream we are
+// not currently pursuing.
+
+class SharedMemoryABI {
+ public:
+  // "14" is the max number that can be encoded in a 32 bit atomic word using
+  // 2 state bits per Chunk and leaving 4 bits for the page layout.
+  // See PageLayout below.
+  static constexpr size_t kMaxChunksPerPage = 14;
+
+  // Each TracePacket in the Chunk is prefixed by a 4 bytes redundant VarInt
+  // (see proto_utils.h) stating its size.
+  static constexpr size_t kPacketHeaderSize = 4;
+
+  // Chunk states and transitions:
+  //    kChunkFree  <----------------+
+  //         |  (Producer)           |
+  //         V                       |
+  //  kChunkBeingWritten             |
+  //         |  (Producer)           |
+  //         V                       |
+  //  kChunkComplete                 |
+  //         |  (Service)            |
+  //         V                       |
+  //  kChunkBeingRead                |
+  //        |   (Service)            |
+  //        +------------------------+
+  enum ChunkState : uint32_t {
+    // The Chunk is free. The Service shall never touch it, the Producer can
+    // acquire it and transition it into kChunkBeingWritten.
+    kChunkFree = 0,
+
+    // The Chunk is being used by the Producer and is not complete yet.
+    // The Service shall never touch kChunkBeingWritten pages.
+    kChunkBeingWritten = 1,
+
+    // The Service is moving the page into its non-shared ring buffer. The
+    // Producer shall never touch kChunkBeingRead pages.
+    kChunkBeingRead = 2,
+
+    // The Producer is done writing the page and won't touch it again. The
+    // Service can now move it to its non-shared ring buffer.
+    // kAllChunksComplete relies on this being == 3.
+    kChunkComplete = 3,
+  };
+  static constexpr const char* kChunkStateStr[] = {"Free", "BeingWritten",
+                                                   "BeingRead", "Complete"};
+
+  enum PageLayout : uint32_t {
+    // The page is fully free and has not been partitioned yet.
+    kPageNotPartitioned = 0,
+
+    // This is a transitional state, set by TryPartitionPage(), after having
+    // succesfully acquired a kPageNotPartitioned page, but before having set
+    // other flags in the page header (e.g., target_buffer).
+    kPageBeingPartitioned = 1,
+
+    // TODO(primiano): Aligning a chunk @ 16 bytes could allow to use faster
+    // intrinsics based on quad-word moves. Do the path and check what is the
+    // fragmentation loss.
+
+    // align4(X) := the largest integer N s.t. (N % 4) == 0 && N <= X.
+    // 8 == sizeof(PageHeader).
+    kPageDiv1 = 2,   // Only one chunk of size: PAGE_SIZE - 8.
+    kPageDiv2 = 3,   // Two chunks of size: align4((PAGE_SIZE - 8) / 2).
+    kPageDiv4 = 4,   // Four chunks of size: align4((PAGE_SIZE - 8) / 4).
+    kPageDiv7 = 5,   // Seven chunks of size: align4((PAGE_SIZE - 8) / 7).
+    kPageDiv14 = 6,  // Fourteen chunks of size: align4((PAGE_SIZE - 8) / 14).
+
+    // The rationale for 7 and 14 above is to maximize the page usage for the
+    // likely case of |page_size| == 4096:
+    // (((4096 - 8) / 14) % 4) == 0, while (((4096 - 8) / 16 % 4)) == 3. So
+    // Div16 would waste 3 * 16 = 48 bytes per page for chunk alignment gaps.
+
+    kPageDivReserved = 7,
+    kNumPageLayouts = 8,
+  };
+
+  // Keep this consistent with the PageLayout enum above.
+  static constexpr size_t kNumChunksForLayout[] = {0, 0, 1, 2, 4, 7, 14, 0};
+
+  // Layout of a Page.
+  // +===================================================+
+  // | Page header [8 bytes]                             |
+  // | Tells how many chunks there are, how big they are |
+  // | and their state (free, read, write, complete).    |
+  // +===================================================+
+  // +***************************************************+
+  // | Chunk #0 header [8 bytes]                         |
+  // | Tells how many packets there are and whether the  |
+  // | whether the 1st and last ones are fragmented.     |
+  // | Also has a chunk id to reassemble fragments.    |
+  // +***************************************************+
+  // +---------------------------------------------------+
+  // | Packet #0 size [varint, up to 4 bytes]            |
+  // + - - - - - - - - - - - - - - - - - - - - - - - - - +
+  // | Packet #0 payload                                 |
+  // | A TracePacket protobuf message                    |
+  // +---------------------------------------------------+
+  //                         ...
+  // + . . . . . . . . . . . . . . . . . . . . . . . . . +
+  // |      Optional padding to maintain aligment        |
+  // + . . . . . . . . . . . . . . . . . . . . . . . . . +
+  // +---------------------------------------------------+
+  // | Packet #N size [varint, up to 4 bytes]            |
+  // + - - - - - - - - - - - - - - - - - - - - - - - - - +
+  // | Packet #N payload                                 |
+  // | A TracePacket protobuf message                    |
+  // +---------------------------------------------------+
+  //                         ...
+  // +***************************************************+
+  // | Chunk #M header [8 bytes]                         |
+  //                         ...
+
+  // Alignment applies to start offset only. The Chunk size is *not* aligned.
+  static constexpr uint32_t kChunkAlignment = 4;
+  static constexpr uint32_t kChunkShift = 2;
+  static constexpr uint32_t kChunkMask = 0x3;
+  static constexpr uint32_t kLayoutMask = 0x70000000;
+  static constexpr uint32_t kLayoutShift = 28;
+  static constexpr uint32_t kAllChunksMask = 0x0FFFFFFF;
+
+  // This assumes that kChunkComplete == 3.
+  static constexpr uint32_t kAllChunksComplete = 0x0FFFFFFF;
+  static constexpr uint32_t kAllChunksFree = 0;
+  static constexpr size_t kInvalidPageIdx = static_cast<size_t>(-1);
+
+  // There is one page header per page, at the beginning of the page.
+  struct PageHeader {
+    // |layout| bits:
+    // [31] [30:28] [27:26] ... [1:0]
+    //  |      |       |     |    |
+    //  |      |       |     |    +---------- ChunkState[0]
+    //  |      |       |     +--------------- ChunkState[12..1]
+    //  |      |       +--------------------- ChunkState[13]
+    //  |      +----------------------------- PageLayout (0 == page fully free)
+    //  +------------------------------------ Reserved for future use
+    std::atomic<uint32_t> layout;
+
+    // Tells the Service on which logging buffer partition the chunks contained
+    // in the page should be moved into. This is reflecting the
+    // DataSourceConfig.target_buffer received at registration time.
+    // kMaxTraceBuffers in basic_types.h relies on the size of this.
+    std::atomic<uint16_t> target_buffer;
+    uint16_t reserved;
+  };
+
+  // There is one Chunk header per chunk (hence PageLayout per page) at the
+  // beginning of each chunk.
+  struct ChunkHeader {
+    enum Flags : uint8_t {
+      // If set, the first TracePacket in the chunk is partial and continues
+      // from |chunk_id| - 1 (within the same |writer_id|).
+      kFirstPacketContinuesFromPrevChunk = 1 << 0,
+
+      // If set, the last TracePacket in the chunk is partial and continues on
+      // |chunk_id| + 1 (within the same |writer_id|).
+      kLastPacketContinuesOnNextChunk = 1 << 1,
+    };
+
+    struct PacketsState {
+      // Number of valid TracePacket protobuf messages contained in the chunk.
+      // Each TracePacket is prefixed by its own size. This field is
+      // monotonically updated by the Producer with release store semantic after
+      // the packet has been written into the chunk.
+      uint16_t count;
+
+      uint8_t flags;
+      uint8_t reserved;
+    };
+
+    // This never changes throughout the life of the Chunk.
+    struct Identifier {
+      // chunk_id is a monotonic counter of the chunk within its own
+      // sequence. The tuple (writer_id, chunk_id) allows to figure
+      // out if two chunks for a data source are contiguous (and hence a trace
+      // packet spanning across them can be glued) or we had some holes due to
+      // the ring buffer wrapping.
+      uint16_t chunk_id;
+
+      // A sequence identifies a linear stream of TracePacket produced by the
+      // same data source.
+      unsigned writer_id : 10;  // kMaxWriterID relies on the size of this.
+
+      unsigned reserved : 6;
+    };
+
+    // Updated with release-store semantics
+    std::atomic<Identifier> identifier;
+    std::atomic<PacketsState> packets_state;
+  };
+  static constexpr size_t kMaxWriterID = (1 << 10) - 1;
+
+  class Chunk {
+   public:
+    Chunk();  // Constructs an invalid chunk.
+
+    // Chunk is move-only, to document the scope of the Acquire/Release
+    // TryLock operations below.
+    Chunk(const Chunk&) = delete;
+    Chunk operator=(const Chunk&) = delete;
+    Chunk(Chunk&&) noexcept = default;
+    Chunk& operator=(Chunk&&) = default;
+
+    uint8_t* begin() const { return begin_; }
+    uint8_t* end() const { return end_; }
+
+    // Size, including Chunk header.
+    size_t size() const { return static_cast<size_t>(end_ - begin_); }
+
+    uint8_t* payload_begin() const { return begin_ + sizeof(ChunkHeader); }
+
+    bool is_valid() const { return begin_ && end_ > begin_; }
+
+    ChunkHeader* header() { return reinterpret_cast<ChunkHeader*>(begin_); }
+
+    // Returns the count of packets and the flags with acquire-load semantics.
+    std::pair<uint16_t, uint8_t> GetPacketCountAndFlags() {
+      auto pstate = header()->packets_state.load(std::memory_order_acquire);
+      return std::make_pair(pstate.count, pstate.flags);
+    }
+
+    // Increases |packets_state.count| with release semantics (however the
+    // packet count is incremented before starting writing a packet).
+    // The increment is atomic but NOT race-free (i.e. no CAS). Only the
+    // Producer is supposed to perform this increment thread-safely. A Chunk
+    // cannot be shared by multiple threads without locking.
+    // The packet count is cleared by TryAcquireChunk(), when passing the new
+    // header for the chunk.
+    void IncrementPacketCount() {
+      ChunkHeader* chunk_header = header();
+      auto pstate = chunk_header->packets_state.load(std::memory_order_relaxed);
+      pstate.count++;
+      chunk_header->packets_state.store(pstate, std::memory_order_release);
+    }
+
+    // Flags are cleared by TryAcquireChunk(), by passing the new header for
+    // the chunk.
+    void SetFlag(ChunkHeader::Flags flag) {
+      ChunkHeader* chunk_header = header();
+      auto pstate = chunk_header->packets_state.load(std::memory_order_relaxed);
+      pstate.flags |= static_cast<uint8_t>(flag);
+      chunk_header->packets_state.store(pstate, std::memory_order_release);
+    }
+
+   private:
+    friend class SharedMemoryABI;
+    Chunk(uint8_t* begin, size_t size);
+
+    // Don't add extra fields, keep the move operator fast.
+    uint8_t* begin_ = nullptr;
+    uint8_t* end_ = nullptr;
+  };
+
+  // Construct an instace from an existing shared memory buffer.
+  SharedMemoryABI(uint8_t* start, size_t size, size_t page_size);
+
+  uint8_t* start() const { return start_; }
+  uint8_t* end() const { return start_ + size_; }
+  size_t size() const { return size_; }
+  size_t page_size() const { return page_size_; }
+  size_t num_pages() const { return num_pages_; }
+
+  uint8_t* page_start(size_t page_idx) {
+    PERFETTO_DCHECK(page_idx < num_pages_);
+    return start_ + page_size_ * page_idx;
+  }
+
+  PageHeader* page_header(size_t page_idx) {
+    return reinterpret_cast<PageHeader*>(page_start(page_idx));
+  }
+
+  // Returns true if the page is fully clear and has not been partitioned yet.
+  // The state of the page can change at any point after this returns (or even
+  // before). The Producer should use this only as a hint to decide out whether
+  // it should TryPartitionPage() or acquire an individual chunk.
+  bool is_page_free(size_t page_idx) {
+    return page_header(page_idx)->layout.load(std::memory_order_relaxed) == 0;
+  }
+
+  // Returns true if all chunks in the page are kChunkComplete. As above, this
+  // is advisory only. The Service is supposed to use this only to decide
+  // whether to TryAcquireAllChunksForReading() or not.
+  bool is_page_complete(size_t page_idx) {
+    auto layout = page_header(page_idx)->layout.load(std::memory_order_relaxed);
+    const size_t num_chunks = GetNumChunksForLayout(layout);
+    if (num_chunks == 0)
+      return false;  // Non partitioned pages cannot be complete.
+    return (layout & kAllChunksMask) ==
+           (kAllChunksComplete & ((1 << (num_chunks * kChunkShift)) - 1));
+  }
+
+  // For testing / debugging only.
+  std::string page_header_dbg(size_t page_idx) {
+    uint32_t x = page_header(page_idx)->layout.load(std::memory_order_relaxed);
+    return std::bitset<32>(x).to_string();
+  }
+
+  // For testing / debugging only.
+  uint32_t page_layout_dbg(size_t page_idx) {
+    return page_header(page_idx)->layout.load(std::memory_order_relaxed);
+  }
+
+  // Returns a bitmap in which each bit is set if the corresponding Chunk exists
+  // in the page (according to the page layout) and is free. If the page is not
+  // partitioned it returns 0 (as if the page had no free chunks).
+  size_t GetFreeChunks(size_t page_idx);
+
+  // Tries to atomically partition a page with the given |layout|. Returns true
+  // if the page was free and has been partitioned with the given |layout|,
+  // false if the page wasn't free anymore by the time we got there.
+  // If succeeds all the chunks are atomically set in the kChunkFree state and
+  // the target_buffer is stored with release-store semantics.
+  bool TryPartitionPage(size_t page_idx,
+                        PageLayout layout,
+                        size_t target_buffer);
+
+  // Tries to atomically mark a single chunk within the page as
+  // kChunkBeingWritten. Returns an invalid chunk if the page is not partitioned
+  // or the chunk is not in the kChunkFree state. If succeeds sets the chunk
+  // header to |header|.
+  Chunk TryAcquireChunkForWriting(size_t page_idx,
+                                  size_t chunk_idx,
+                                  size_t expected_target_buffer,
+                                  const ChunkHeader* header) {
+    return TryAcquireChunk(page_idx, chunk_idx, expected_target_buffer,
+                           kChunkBeingWritten, header);
+  }
+
+  // Similar to TryAcquireChunkForWriting. Fails if the chunk isn't in the
+  // kChunkComplete state.
+  Chunk TryAcquireChunkForReading(size_t page_idx,
+                                  size_t chunk_idx,
+                                  size_t expected_target_buffer) {
+    return TryAcquireChunk(page_idx, chunk_idx, expected_target_buffer,
+                           kChunkBeingRead, nullptr);
+  }
+
+  // Used by the Service to take full ownership of all the chunks in the a page
+  // in one shot.  It tries to atomically migrate all chunks into the
+  // kChunkBeingRead state. Can only be done if all chunks are either kChunkFree
+  // or kChunkComplete. If this fails, the service has to fall back acquiring
+  // the chunks individually.
+  bool TryAcquireAllChunksForReading(size_t page_idx);
+  void ReleaseAllChunksAsFree(size_t page_idx);
+
+  // The caller must have successfully TryAcquireAllChunksForReading().
+  Chunk GetChunkUnchecked(size_t page_idx,
+                          uint32_t page_layout,
+                          size_t chunk_idx);
+
+  // Puts a chunk into the kChunkComplete state.
+  // If all chunks in the page are kChunkComplete returns the page index,
+  // otherwise returns kInvalidPageIdx.
+  size_t ReleaseChunkAsComplete(Chunk chunk) {
+    return ReleaseChunk(std::move(chunk), kChunkComplete);
+  }
+
+  // Puts a chunk into the kChunkFree state.
+  // If all chunks in the page are kChunkFree returns the page index,
+  // otherwise returns kInvalidPageIdx.
+  size_t ReleaseChunkAsFree(Chunk chunk) {
+    return ReleaseChunk(std::move(chunk), kChunkFree);
+  }
+
+  ChunkState GetChunkState(size_t page_idx, size_t chunk_idx) {
+    PageHeader* phdr = page_header(page_idx);
+    uint32_t layout = phdr->layout.load(std::memory_order_relaxed);
+    return static_cast<ChunkState>((layout >> (chunk_idx * kChunkShift)) &
+                                   kChunkMask);
+  }
+
+  std::pair<size_t, size_t> GetPageAndChunkIndex(const Chunk& chunk);
+
+  static constexpr size_t GetNumChunksForLayout(uint32_t page_layout) {
+    return kNumChunksForLayout[(page_layout & kLayoutMask) >> kLayoutShift];
+  }
+
+ private:
+  SharedMemoryABI(const SharedMemoryABI&) = delete;
+  SharedMemoryABI& operator=(const SharedMemoryABI&) = delete;
+
+  size_t GetChunkSizeForLayout(uint32_t page_layout) const {
+    return chunk_sizes_[(page_layout & kLayoutMask) >> kLayoutShift];
+  }
+
+  Chunk TryAcquireChunk(size_t page_idx,
+                        size_t chunk_idx,
+                        size_t expected_target_buffer,
+                        ChunkState,
+                        const ChunkHeader*);
+  size_t ReleaseChunk(Chunk chunk, ChunkState);
+
+  uint8_t* const start_;
+  const size_t size_;
+  const size_t page_size_;
+  const size_t num_pages_;
+  std::array<size_t, kNumPageLayouts> const chunk_sizes_;
+};
+
+}  // namespace perfetto
+
+#endif  // INCLUDE_PERFETTO_TRACING_CORE_SHARED_MEMORY_ABI_H_
diff --git a/src/tracing/BUILD.gn b/src/tracing/BUILD.gn
index 2cef20a..33697e5 100644
--- a/src/tracing/BUILD.gn
+++ b/src/tracing/BUILD.gn
@@ -34,6 +34,7 @@
     "core/id_allocator.h",
     "core/service_impl.cc",
     "core/service_impl.h",
+    "core/shared_memory_abi.cc",
     "core/trace_config.cc",
     "core/trace_packet.cc",
   ]
@@ -82,6 +83,7 @@
     "core/chunked_protobuf_input_stream_unittest.cc",
     "core/id_allocator_unittest.cc",
     "core/service_impl_unittest.cc",
+    "core/shared_memory_abi_unittest.cc",
     "core/trace_packet_unittest.cc",
     "ipc/posix_shared_memory_unittest.cc",
     "test/test_shared_memory.cc",
diff --git a/src/tracing/core/shared_memory_abi.cc b/src/tracing/core/shared_memory_abi.cc
new file mode 100644
index 0000000..5fa8310
--- /dev/null
+++ b/src/tracing/core/shared_memory_abi.cc
@@ -0,0 +1,341 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an "AS
+ * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
+ */
+#include "perfetto/tracing/core/shared_memory_abi.h"
+
+#include <sys/mman.h>
+
+#include "perfetto/base/utils.h"
+#include "perfetto/tracing/core/basic_types.h"
+
+namespace perfetto {
+
+namespace {
+// Returns the largest 4-bytes aligned chunk size <= |page_size| / |divider|
+// for each divider in PageLayout.
+constexpr size_t GetChunkSize(size_t page_size, size_t divider) {
+  return ((page_size - sizeof(SharedMemoryABI::PageHeader)) / divider) & ~3UL;
+}
+
+// Initializer for the const |chunk_sizes_| array.
+std::array<size_t, SharedMemoryABI::kNumPageLayouts> InitChunkSizes(
+    size_t page_size) {
+  static_assert(SharedMemoryABI::kNumPageLayouts ==
+                    base::ArraySize(SharedMemoryABI::kNumChunksForLayout),
+                "kNumPageLayouts out of date");
+  std::array<size_t, SharedMemoryABI::kNumPageLayouts> res = {};
+  for (size_t i = 0; i < SharedMemoryABI::kNumPageLayouts; i++) {
+    size_t num_chunks = SharedMemoryABI::kNumChunksForLayout[i];
+    res[i] = num_chunks == 0 ? 0 : GetChunkSize(page_size, num_chunks);
+  }
+  return res;
+}
+
+}  // namespace
+
+// static
+constexpr size_t SharedMemoryABI::kNumChunksForLayout[];
+constexpr const char* SharedMemoryABI::kChunkStateStr[];
+constexpr const size_t SharedMemoryABI::kInvalidPageIdx;
+
+SharedMemoryABI::SharedMemoryABI(uint8_t* start, size_t size, size_t page_size)
+    : start_(start),
+      size_(size),
+      page_size_(page_size),
+      num_pages_(size / page_size),
+      chunk_sizes_(InitChunkSizes(page_size)) {
+  static_assert(sizeof(PageHeader) == 8, "PageHeader size");
+  static_assert(sizeof(ChunkHeader) == 8, "ChunkHeader size");
+  static_assert(sizeof(ChunkHeader::PacketsState) == 4, "PacketsState size");
+  static_assert(alignof(ChunkHeader) == kChunkAlignment,
+                "ChunkHeader alignment");
+
+  // In theory std::atomic does not guarantee that the underlying type
+  // consists only of the actual atomic word. Theoretically it could have
+  // locks or other state. In practice most implementations just implement
+  // them without extra state. The code below overlays the atomic into the
+  // SMB, hence relies on this implementation detail. This should be fine
+  // pragmatically (Chrome's base makes the same assumption), but let's have a
+  // check for this.
+  static_assert(sizeof(std::atomic<uint32_t>) == sizeof(uint32_t) &&
+                    sizeof(std::atomic<uint16_t>) == sizeof(uint16_t),
+                "Incompatible STL <atomic> implementation");
+
+  // Chec that the kAllChunks(Complete,Free) are consistent with the
+  // ChunkState enum values.
+
+  // These must be zero because rely on zero-initialized memory being
+  // interpreted as "free".
+  static_assert(kChunkFree == 0 && kAllChunksFree == 0,
+                "kChunkFree/kAllChunksFree and must be 0");
+
+  static_assert((kAllChunksComplete & kChunkMask) == kChunkComplete,
+                "kAllChunksComplete out of sync with kChunkComplete");
+
+  // Sanity check the consistency of the kMax... constants.
+  ChunkHeader::Identifier chunk_id = {};
+  PERFETTO_CHECK((chunk_id.writer_id -= 1) == kMaxWriterID);
+
+  PageHeader phdr;
+  phdr.target_buffer.store(-1);
+  PERFETTO_CHECK(phdr.target_buffer.load() >= kMaxTraceBuffers - 1);
+
+  PERFETTO_CHECK(page_size >= 4096);
+  PERFETTO_CHECK(page_size % 4096 == 0);
+  PERFETTO_CHECK(reinterpret_cast<uintptr_t>(start) % 4096 == 0);
+  PERFETTO_CHECK(size % page_size == 0);
+}
+
+SharedMemoryABI::Chunk SharedMemoryABI::GetChunkUnchecked(size_t page_idx,
+                                                          uint32_t page_layout,
+                                                          size_t chunk_idx) {
+  const size_t num_chunks = GetNumChunksForLayout(page_layout);
+  PERFETTO_DCHECK(chunk_idx < num_chunks);
+  // Compute the chunk virtual address and write it into |chunk|.
+  const size_t chunk_size = GetChunkSizeForLayout(page_layout);
+  size_t chunk_offset_in_page = sizeof(PageHeader) + chunk_idx * chunk_size;
+
+  Chunk chunk(page_start(page_idx) + chunk_offset_in_page, chunk_size);
+  PERFETTO_DCHECK(chunk.end() <= end());
+  return chunk;
+}
+
+SharedMemoryABI::Chunk SharedMemoryABI::TryAcquireChunk(
+    size_t page_idx,
+    size_t chunk_idx,
+    size_t expected_target_buffer,
+    ChunkState desired_chunk_state,
+    const ChunkHeader* header) {
+  PERFETTO_DCHECK(desired_chunk_state == kChunkBeingRead ||
+                  desired_chunk_state == kChunkBeingWritten);
+  PageHeader* phdr = page_header(page_idx);
+  uint32_t layout;
+  uint32_t attempts = 1000;
+  do {
+    layout = phdr->layout.load(std::memory_order_acquire);
+    if (__builtin_expect(
+            (layout & kLayoutMask) >> kLayoutShift != kPageBeingPartitioned,
+            true)) {
+      break;
+    }
+    std::this_thread::yield();
+  } while (--attempts);
+  // If |attempts| == 0, |num_chunks| below will become 0 and this function
+  // will return failing.
+  const size_t num_chunks = GetNumChunksForLayout(layout);
+
+  // The page layout has changed (or the page is free).
+  if (chunk_idx >= num_chunks)
+    return Chunk();
+
+  // The page has been acquired by a writer that is targeting a different
+  // buffer. The caller has to try with another page.
+  if (phdr->target_buffer.load(std::memory_order_relaxed) !=
+      expected_target_buffer) {
+    return Chunk();
+  }
+
+  // Verify that the chunk is still in a state that allows the transition to
+  // |desired_chunk_state|. The only allowed transitions are:
+  // 1. kChunkFree -> kChunkBeingWritten (Producer).
+  // 2. kChunkComplete -> kChunkBeingRead (Service).
+  ChunkState expected_chunk_state =
+      desired_chunk_state == kChunkBeingWritten ? kChunkFree : kChunkComplete;
+  auto cur_chunk_state = (layout >> (chunk_idx * kChunkShift)) & kChunkMask;
+  if (cur_chunk_state != expected_chunk_state)
+    return Chunk();
+
+  uint32_t next_layout = layout;
+  next_layout &= ~(kChunkMask << (chunk_idx * kChunkShift));
+  next_layout |= (desired_chunk_state << (chunk_idx * kChunkShift));
+  if (!phdr->layout.compare_exchange_strong(layout, next_layout,
+                                            std::memory_order_acq_rel)) {
+    // TODO: returning here is too pessimistic. We should look at the returned
+    // |layout| to figure out if some other writer thread took the same chunk
+    // (in which case we should immediately return false) or if they took
+    // another chunk in the same page (in which case we should just retry).
+    return Chunk();
+  }
+
+  // Compute the chunk virtual address and write it into |chunk|.
+  Chunk chunk = GetChunkUnchecked(page_idx, layout, chunk_idx);
+  if (desired_chunk_state == kChunkBeingWritten) {
+    PERFETTO_DCHECK(header);
+    ChunkHeader* new_header = chunk.header();
+    new_header->packets_state.store(header->packets_state,
+                                    std::memory_order_relaxed);
+    new_header->identifier.store(header->identifier, std::memory_order_release);
+  }
+  return chunk;
+}
+
+bool SharedMemoryABI::TryPartitionPage(size_t page_idx,
+                                       PageLayout layout,
+                                       size_t target_buffer) {
+  PERFETTO_DCHECK(target_buffer < kMaxTraceBuffers);
+  PERFETTO_DCHECK(layout >= kPageDiv1 && layout <= kPageDiv14);
+  uint32_t expected_layout = 0;  // Free page.
+  uint32_t next_layout = (kPageBeingPartitioned << kLayoutShift) & kLayoutMask;
+  PageHeader* phdr = page_header(page_idx);
+  if (!phdr->layout.compare_exchange_strong(expected_layout, next_layout,
+                                            std::memory_order_acq_rel)) {
+    return false;
+  }
+
+  // Store any page flag before storing the final |layout|. |layout| is read
+  // with acquire semantics.
+  phdr->target_buffer.store(static_cast<uint16_t>(target_buffer),
+                            std::memory_order_relaxed);
+  phdr->layout.store((layout << kLayoutShift) & kLayoutMask,
+                     std::memory_order_release);
+  return true;
+}
+
+size_t SharedMemoryABI::GetFreeChunks(size_t page_idx) {
+  uint32_t layout =
+      page_header(page_idx)->layout.load(std::memory_order_relaxed);
+  const size_t num_chunks = GetNumChunksForLayout(layout);
+  size_t res = 0;
+  for (size_t i = 0; i < num_chunks; i++) {
+    res |= ((layout & kChunkMask) == kChunkFree) ? (1 << i) : 0;
+    layout >>= kChunkShift;
+  }
+  return res;
+}
+
+size_t SharedMemoryABI::ReleaseChunk(Chunk chunk,
+                                     ChunkState desired_chunk_state) {
+  PERFETTO_DCHECK(desired_chunk_state == kChunkComplete ||
+                  desired_chunk_state == kChunkFree);
+
+  size_t page_idx;
+  size_t chunk_idx;
+  std::tie(page_idx, chunk_idx) = GetPageAndChunkIndex(chunk);
+
+  for (int attempt = 0; attempt < 64; attempt++) {
+    PageHeader* phdr = page_header(page_idx);
+    uint32_t layout = phdr->layout.load(std::memory_order_relaxed);
+    const size_t page_chunk_size = GetChunkSizeForLayout(layout);
+    PERFETTO_CHECK(chunk.size() == page_chunk_size);
+    const uint32_t chunk_state =
+        ((layout >> (chunk_idx * kChunkShift)) & kChunkMask);
+
+    // Verify that the chunk is still in a state that allows the transition to
+    // |desired_chunk_state|. The only allowed transitions are:
+    // 1. kChunkBeingWritten -> kChunkComplete (Producer).
+    // 2. kChunkBeingRead -> kChunkFree (Service).
+    ChunkState expected_chunk_state;
+    uint32_t all_chunks_state;
+    if (desired_chunk_state == kChunkComplete) {
+      expected_chunk_state = kChunkBeingWritten;
+      all_chunks_state = kAllChunksComplete;
+    } else {
+      expected_chunk_state = kChunkBeingRead;
+      all_chunks_state = kAllChunksFree;
+    }
+    const size_t num_chunks = GetNumChunksForLayout(layout);
+    all_chunks_state &= (1 << (num_chunks * kChunkShift)) - 1;
+    PERFETTO_CHECK(chunk_state == expected_chunk_state);
+    uint32_t next_layout = layout;
+    next_layout &= ~(kChunkMask << (chunk_idx * kChunkShift));
+    next_layout |= (desired_chunk_state << (chunk_idx * kChunkShift));
+
+    // If we are freeing a chunk and all the other chunks in the page are free
+    // we should de-partition the page and mark it as clear.
+    // TODO: maybe even madvise() it?
+    if ((next_layout & kAllChunksMask) == kAllChunksFree)
+      next_layout = 0;
+
+    if (phdr->layout.compare_exchange_strong(layout, next_layout,
+                                             std::memory_order_acq_rel)) {
+      return (next_layout & kAllChunksMask) == all_chunks_state
+                 ? page_idx
+                 : kInvalidPageIdx;
+    }
+    std::this_thread::yield();
+  }
+  // Too much contention on this page. Give up. This page will be left pending
+  // forever but there isn't much more we can do at this point.
+  PERFETTO_DCHECK(false);
+  return kInvalidPageIdx;
+}
+
+bool SharedMemoryABI::TryAcquireAllChunksForReading(size_t page_idx) {
+  PageHeader* phdr = page_header(page_idx);
+  uint32_t layout = phdr->layout.load(std::memory_order_relaxed);
+  const size_t num_chunks = GetNumChunksForLayout(layout);
+  if (num_chunks == 0)
+    return false;
+  uint32_t next_layout = layout & kLayoutMask;
+  for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+    const uint32_t chunk_state =
+        ((layout >> (chunk_idx * kChunkShift)) & kChunkMask);
+    switch (chunk_state) {
+      case kChunkBeingWritten:
+        return false;
+      case kChunkBeingRead:
+      case kChunkComplete:
+        next_layout |= kChunkBeingRead << (chunk_idx * kChunkShift);
+        break;
+      case kChunkFree:
+        next_layout |= kChunkFree << (chunk_idx * kChunkShift);
+        break;
+    }
+  }
+  return phdr->layout.compare_exchange_strong(layout, next_layout,
+                                              std::memory_order_acq_rel);
+}
+
+void SharedMemoryABI::ReleaseAllChunksAsFree(size_t page_idx) {
+  PageHeader* phdr = page_header(page_idx);
+  phdr->layout.store(0, std::memory_order_release);
+  uint8_t* page_start = start_ + page_idx * page_size_;
+  // TODO: On Linux/Android this should be MADV_REMOVE if we use
+  // memfd_create() and tmpfs supports hole punching (need to consult kernel
+  // sources).
+  int ret = madvise(reinterpret_cast<uint8_t*>(page_start), page_size_,
+                    MADV_DONTNEED);
+  PERFETTO_DCHECK(ret == 0);
+}
+
+SharedMemoryABI::Chunk::Chunk() = default;
+
+SharedMemoryABI::Chunk::Chunk(uint8_t* begin, size_t size)
+    : begin_(begin), end_(begin + size) {
+  PERFETTO_CHECK(reinterpret_cast<uintptr_t>(begin) % kChunkAlignment == 0);
+  PERFETTO_CHECK(end_ >= begin_);
+}
+
+std::pair<size_t, size_t> SharedMemoryABI::GetPageAndChunkIndex(
+    const Chunk& chunk) {
+  PERFETTO_DCHECK(chunk.is_valid());
+  PERFETTO_DCHECK(chunk.begin() >= start_);
+  PERFETTO_DCHECK(chunk.end() <= start_ + size_);
+
+  // TODO(primiano): The divisions below could be avoided if we cached
+  // |page_shift_|.
+  const uintptr_t rel_addr = chunk.begin() - start_;
+  const size_t page_idx = rel_addr / page_size_;
+  const size_t offset = rel_addr % page_size_;
+  PERFETTO_DCHECK(offset >= sizeof(PageHeader));
+  PERFETTO_DCHECK(offset % kChunkAlignment == 0);
+  PERFETTO_DCHECK((offset - sizeof(PageHeader)) % chunk.size() == 0);
+  const size_t chunk_idx = (offset - sizeof(PageHeader)) / chunk.size();
+  PERFETTO_DCHECK(chunk_idx < kMaxChunksPerPage);
+  PERFETTO_DCHECK(chunk_idx < GetNumChunksForLayout(page_layout_dbg(page_idx)));
+  return std::make_pair(page_idx, chunk_idx);
+}
+
+}  // namespace perfetto
diff --git a/src/tracing/core/shared_memory_abi_unittest.cc b/src/tracing/core/shared_memory_abi_unittest.cc
new file mode 100644
index 0000000..d460759
--- /dev/null
+++ b/src/tracing/core/shared_memory_abi_unittest.cc
@@ -0,0 +1,306 @@
+/*
+ * Copyright (C) 2017 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "perfetto/tracing/core/shared_memory_abi.h"
+
+#include "gtest/gtest.h"
+#include "perfetto/base/utils.h"
+
+namespace perfetto {
+namespace {
+
+using testing::ValuesIn;
+using Chunk = SharedMemoryABI::Chunk;
+using ChunkHeader = SharedMemoryABI::ChunkHeader;
+
+class SharedMemoryABITest : public ::testing::TestWithParam<size_t> {
+ public:
+  void SetUp() override {
+    page_size_ = GetParam();
+    buf_size_ = page_size_ * kNumPages;
+    void* mem = nullptr;
+    PERFETTO_CHECK(posix_memalign(&mem, page_size_, buf_size_) == 0);
+    buf_.reset(reinterpret_cast<uint8_t*>(mem));
+    memset(buf_.get(), 0, buf_size_);
+  }
+
+  void TearDown() override { buf_.reset(); }
+
+  const size_t kNumPages = 10;
+  std::unique_ptr<uint8_t, base::FreeDeleter> buf_;
+  size_t buf_size_;
+  size_t page_size_;
+};
+
+size_t const kPageSizes[] = {4096, 8192, 16384, 32768, 65536};
+INSTANTIATE_TEST_CASE_P(PageSize, SharedMemoryABITest, ValuesIn(kPageSizes));
+
+TEST_P(SharedMemoryABITest, NominalCases) {
+  SharedMemoryABI abi(buf_.get(), buf_size_, page_size_);
+
+  ASSERT_EQ(buf_.get(), abi.start());
+  ASSERT_EQ(buf_.get() + buf_size_, abi.end());
+  ASSERT_EQ(buf_size_, abi.size());
+  ASSERT_EQ(page_size_, abi.page_size());
+  ASSERT_EQ(kNumPages, abi.num_pages());
+
+  for (size_t i = 0; i < kNumPages; i++) {
+    ASSERT_TRUE(abi.is_page_free(i));
+    ASSERT_FALSE(abi.is_page_complete(i));
+    // GetFreeChunks() should return 0 for an unpartitioned page.
+    ASSERT_EQ(0u, abi.GetFreeChunks(i));
+  }
+
+  ASSERT_TRUE(abi.TryPartitionPage(0, SharedMemoryABI::kPageDiv1, 10));
+  ASSERT_EQ(0x01u, abi.GetFreeChunks(0));
+
+  ASSERT_TRUE(abi.TryPartitionPage(1, SharedMemoryABI::kPageDiv2, 11));
+  ASSERT_EQ(0x03u, abi.GetFreeChunks(1));
+
+  ASSERT_TRUE(abi.TryPartitionPage(2, SharedMemoryABI::kPageDiv4, 12));
+  ASSERT_EQ(0x0fu, abi.GetFreeChunks(2));
+
+  ASSERT_TRUE(abi.TryPartitionPage(3, SharedMemoryABI::kPageDiv7, 13));
+  ASSERT_EQ(0x7fu, abi.GetFreeChunks(3));
+
+  ASSERT_TRUE(abi.TryPartitionPage(4, SharedMemoryABI::kPageDiv14, 14));
+  ASSERT_EQ(0x3fffu, abi.GetFreeChunks(4));
+
+  // Repartitioning an existing page must fail.
+  ASSERT_FALSE(abi.TryPartitionPage(0, SharedMemoryABI::kPageDiv1, 10));
+  ASSERT_FALSE(abi.TryPartitionPage(4, SharedMemoryABI::kPageDiv14, 14));
+
+  for (size_t i = 0; i <= 4; i++) {
+    ASSERT_FALSE(abi.is_page_free(i));
+    ASSERT_FALSE(abi.is_page_complete(i));
+  }
+
+  uint16_t last_chunk_id = 0;
+  unsigned last_writer_id = 0;
+  uint8_t* last_chunk_begin = nullptr;
+  uint8_t* last_chunk_end = nullptr;
+
+  for (size_t page_idx = 0; page_idx <= 4; page_idx++) {
+    uint8_t* const page_start = buf_.get() + page_idx * page_size_;
+    uint8_t* const page_end = page_start + page_size_;
+    const size_t num_chunks =
+        SharedMemoryABI::GetNumChunksForLayout(abi.page_layout_dbg(page_idx));
+    const size_t target_buffer = 10 + page_idx;
+    Chunk chunks[14];
+
+    for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+      Chunk& chunk = chunks[chunk_idx];
+      ChunkHeader header{};
+
+      ASSERT_EQ(SharedMemoryABI::kChunkFree,
+                abi.GetChunkState(page_idx, chunk_idx));
+      uint16_t chunk_id = ++last_chunk_id;
+      last_writer_id = (last_writer_id + 1) & SharedMemoryABI::kMaxWriterID;
+      unsigned writer_id = last_writer_id;
+      header.identifier.store({chunk_id, writer_id, 0 /* reserved */});
+
+      uint16_t packets_count = static_cast<uint16_t>(chunk_idx * 10);
+      uint8_t flags = static_cast<uint8_t>(0xffu - chunk_idx);
+      header.packets_state.store({packets_count, flags, 0 /* reserved */});
+
+      // Acquiring a chunk with a different target_buffer should fail.
+      chunk = abi.TryAcquireChunkForWriting(page_idx, chunk_idx,
+                                            target_buffer + 1, &header);
+      ASSERT_FALSE(chunk.is_valid());
+      ASSERT_EQ(SharedMemoryABI::kChunkFree,
+                abi.GetChunkState(page_idx, chunk_idx));
+
+      // But acquiring with the right |target_buffer| should succeed.
+      chunk = abi.TryAcquireChunkForWriting(page_idx, chunk_idx, target_buffer,
+                                            &header);
+      ASSERT_TRUE(chunk.is_valid());
+      ASSERT_EQ(SharedMemoryABI::kChunkBeingWritten,
+                abi.GetChunkState(page_idx, chunk_idx));
+
+      // Sanity check chunk bounds.
+      size_t expected_chunk_size =
+          (page_size_ - sizeof(SharedMemoryABI::PageHeader)) / num_chunks;
+      expected_chunk_size = expected_chunk_size - (expected_chunk_size % 4);
+      ASSERT_EQ(expected_chunk_size, chunk.size());
+      ASSERT_GT(chunk.begin(), page_start);
+      ASSERT_GT(chunk.begin(), last_chunk_begin);
+      ASSERT_GE(chunk.begin(), last_chunk_end);
+      ASSERT_LE(chunk.end(), page_end);
+      ASSERT_GT(chunk.end(), chunk.begin());
+      ASSERT_EQ(chunk.end(), chunk.begin() + chunk.size());
+      last_chunk_begin = chunk.begin();
+      last_chunk_end = chunk.end();
+
+      ASSERT_EQ(chunk_id, chunk.header()->identifier.load().chunk_id);
+      ASSERT_EQ(writer_id, chunk.header()->identifier.load().writer_id);
+      ASSERT_EQ(packets_count, chunk.header()->packets_state.load().count);
+      ASSERT_EQ(flags, chunk.header()->packets_state.load().flags);
+      ASSERT_EQ(std::make_pair(packets_count, flags),
+                chunk.GetPacketCountAndFlags());
+
+      chunk.IncrementPacketCount();
+      ASSERT_EQ(packets_count + 1, chunk.header()->packets_state.load().count);
+
+      chunk.IncrementPacketCount();
+      ASSERT_EQ(packets_count + 2, chunk.header()->packets_state.load().count);
+
+      chunk.SetFlag(
+          SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
+      ASSERT_TRUE(
+          chunk.header()->packets_state.load().flags &
+          SharedMemoryABI::ChunkHeader::kLastPacketContinuesOnNextChunk);
+
+      // Reacquiring the same chunk should fail.
+      ASSERT_FALSE(abi.TryAcquireChunkForWriting(page_idx, chunk_idx,
+                                                 target_buffer, &header)
+                       .is_valid());
+    }
+
+    // Now release chunks and check the Release() logic.
+    for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+      Chunk& chunk = chunks[chunk_idx];
+
+      // ReleaseChunkAsComplete returns |page_idx| only if all chunks in the
+      // page are complete.
+      size_t res = abi.ReleaseChunkAsComplete(std::move(chunk));
+      if (chunk_idx == num_chunks - 1) {
+        ASSERT_EQ(page_idx, res);
+        ASSERT_TRUE(abi.is_page_complete(page_idx));
+      } else {
+        ASSERT_EQ(SharedMemoryABI::kInvalidPageIdx, res);
+        ASSERT_FALSE(abi.is_page_complete(page_idx));
+      }
+      ASSERT_EQ(SharedMemoryABI::kChunkComplete,
+                abi.GetChunkState(page_idx, chunk_idx));
+    }
+
+    // Now acquire all chunks for reading.
+    for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+      Chunk& chunk = chunks[chunk_idx];
+      // Acquiring with the wrong |target_buffer| should fail.
+      chunk =
+          abi.TryAcquireChunkForReading(page_idx, chunk_idx, target_buffer + 1);
+      ASSERT_FALSE(chunk.is_valid());
+
+      // Acquiring with the right |target_buffer| should succeed.
+      chunk = abi.TryAcquireChunkForReading(page_idx, chunk_idx, target_buffer);
+      ASSERT_TRUE(chunk.is_valid());
+      ASSERT_EQ(SharedMemoryABI::kChunkBeingRead,
+                abi.GetChunkState(page_idx, chunk_idx));
+    }
+
+    // Finally release all chunks as free.
+    for (size_t chunk_idx = 0; chunk_idx < num_chunks; chunk_idx++) {
+      Chunk& chunk = chunks[chunk_idx];
+
+      // ReleaseChunkAsFree returns |page_idx| only if all chunks in the
+      // page are free. If this was the last chunk in the page, the full page
+      // should be marked as free.
+      size_t res = abi.ReleaseChunkAsFree(std::move(chunk));
+      if (chunk_idx == num_chunks - 1) {
+        ASSERT_EQ(page_idx, res);
+        ASSERT_TRUE(abi.is_page_free(page_idx));
+      } else {
+        ASSERT_EQ(SharedMemoryABI::kInvalidPageIdx, res);
+        ASSERT_FALSE(abi.is_page_free(page_idx));
+        ASSERT_EQ(SharedMemoryABI::kChunkFree,
+                  abi.GetChunkState(page_idx, chunk_idx));
+      }
+    }
+  }
+}
+
+TEST_P(SharedMemoryABITest, BatchAcquireAndRelease) {
+  SharedMemoryABI abi(buf_.get(), buf_size_, page_size_);
+  ChunkHeader header{};
+
+  // TryAcquire on a non-partitioned page should fail.
+  ASSERT_FALSE(abi.TryAcquireChunkForWriting(0, 0, 0, &header).is_valid());
+  ASSERT_FALSE(abi.TryAcquireChunkForReading(0, 0, 0).is_valid());
+
+  // Now partition the page in one whole chunk.
+  ASSERT_TRUE(abi.TryPartitionPage(0, SharedMemoryABI::kPageDiv1, 10));
+
+  Chunk chunk = abi.TryAcquireChunkForWriting(0, 0, 10, &header);
+  ASSERT_TRUE(chunk.is_valid());
+
+  // TryAcquireAllChunksForReading() should fail, as the chunk is being written.
+  ASSERT_FALSE(abi.TryAcquireAllChunksForReading(0));
+
+  ASSERT_EQ(0u, abi.ReleaseChunkAsComplete(std::move(chunk)));
+
+  // TryAcquireAllChunksForReading() should succeed given that the page has only
+  // one chunk and is now complete.
+  ASSERT_TRUE(abi.TryAcquireAllChunksForReading(0));
+
+  // Release the one chunk and check that the page is freed up.
+  abi.ReleaseAllChunksAsFree(0);
+  ASSERT_TRUE(abi.is_page_free(0));
+
+  // Now repartition the page into four chunks and try some trickier cases.
+  ASSERT_TRUE(abi.TryPartitionPage(0, SharedMemoryABI::kPageDiv4, 10));
+
+  // Acquire only the first and last chunks.
+  Chunk chunk0 = abi.TryAcquireChunkForWriting(0, 0, 10, &header);
+  ASSERT_TRUE(chunk0.is_valid());
+  Chunk chunk3 = abi.TryAcquireChunkForWriting(0, 3, 10, &header);
+  ASSERT_TRUE(chunk3.is_valid());
+
+  // TryAcquireAllChunksForReading() should fail, some chunks are being written.
+  ASSERT_FALSE(abi.TryAcquireAllChunksForReading(0));
+
+  // Mark only one chunks as complete and try again, it should still fail.
+  // Note: calls to ReleaseChunkAsComplete() will return kInvalidPageIdx
+  // because not all chunks are complete (the two middles ones remain free).
+  ASSERT_EQ(SharedMemoryABI::kInvalidPageIdx,
+            abi.ReleaseChunkAsComplete(std::move(chunk0)));
+
+  ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi.GetChunkState(0, 0));
+  ASSERT_EQ(SharedMemoryABI::kChunkFree, abi.GetChunkState(0, 1));
+  ASSERT_EQ(SharedMemoryABI::kChunkFree, abi.GetChunkState(0, 2));
+  ASSERT_EQ(SharedMemoryABI::kChunkBeingWritten, abi.GetChunkState(0, 3));
+  ASSERT_FALSE(abi.TryAcquireAllChunksForReading(0));
+
+  // Now release also the last chunk as complete and try again the
+  // TryAcquireAllChunksForReading(). This time it should succeed.
+  ASSERT_EQ(SharedMemoryABI::kInvalidPageIdx,
+            abi.ReleaseChunkAsComplete(std::move(chunk3)));
+
+  ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi.GetChunkState(0, 0));
+  ASSERT_EQ(SharedMemoryABI::kChunkFree, abi.GetChunkState(0, 1));
+  ASSERT_EQ(SharedMemoryABI::kChunkFree, abi.GetChunkState(0, 2));
+  ASSERT_EQ(SharedMemoryABI::kChunkComplete, abi.GetChunkState(0, 3));
+  ASSERT_TRUE(abi.TryAcquireAllChunksForReading(0));
+
+  // At this point the two outer chunks should transition into the
+  // kChunkBeingRead state, while the middle ones should stay free.
+  ASSERT_EQ(SharedMemoryABI::kChunkBeingRead, abi.GetChunkState(0, 0));
+  ASSERT_EQ(SharedMemoryABI::kChunkFree, abi.GetChunkState(0, 1));
+  ASSERT_EQ(SharedMemoryABI::kChunkFree, abi.GetChunkState(0, 2));
+  ASSERT_EQ(SharedMemoryABI::kChunkBeingRead, abi.GetChunkState(0, 3));
+
+  // Release only one chunk as free.
+  abi.ReleaseChunkAsFree(abi.GetChunkUnchecked(0, abi.page_layout_dbg(0), 0));
+  ASSERT_EQ(SharedMemoryABI::kChunkFree, abi.GetChunkState(0, 0));
+  ASSERT_EQ(SharedMemoryABI::kChunkBeingRead, abi.GetChunkState(0, 3));
+
+  // Release the last chunk as free, the full page should be freed.
+  abi.ReleaseChunkAsFree(abi.GetChunkUnchecked(0, abi.page_layout_dbg(0), 3));
+  ASSERT_TRUE(abi.is_page_free(0));
+}
+
+}  // namespace
+}  // namespace perfetto