Merge "Incremental installations in adb, DataLoader/device side."
diff --git a/core/java/android/os/incremental/V4Signature.java b/core/java/android/os/incremental/V4Signature.java
index 6450a67..6516917 100644
--- a/core/java/android/os/incremental/V4Signature.java
+++ b/core/java/android/os/incremental/V4Signature.java
@@ -16,6 +16,7 @@
 
 package android.os.incremental;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -47,6 +48,15 @@
     }
 
     /**
+     * Construct a V4Signature from .idsig file.
+     */
+    public static V4Signature readFrom(byte[] bytes) throws IOException {
+        try (DataInputStream stream = new DataInputStream(new ByteArrayInputStream(bytes))) {
+            return readFrom(stream);
+        }
+    }
+
+    /**
      * Store the V4Signature to a byte-array.
      */
     public byte[] toByteArray() {
diff --git a/services/core/java/com/android/server/pm/PackageManagerShellCommand.java b/services/core/java/com/android/server/pm/PackageManagerShellCommand.java
index c267cea..f1e403b 100644
--- a/services/core/java/com/android/server/pm/PackageManagerShellCommand.java
+++ b/services/core/java/com/android/server/pm/PackageManagerShellCommand.java
@@ -119,6 +119,7 @@
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -135,7 +136,6 @@
 class PackageManagerShellCommand extends ShellCommand {
     /** Path for streaming APK content */
     private static final String STDIN_PATH = "-";
-    private static final byte[] STDIN_PATH_BYTES = "-".getBytes(StandardCharsets.UTF_8);
     /** Path where ART profiles snapshots are dumped for the shell user */
     private final static String ART_PROFILE_SNAPSHOT_DEBUG_LOCATION = "/data/misc/profman/";
     private static final int DEFAULT_WAIT_MS = 60 * 1000;
@@ -2988,8 +2988,10 @@
         try {
             // 1. Single file from stdin.
             if (args.isEmpty() || STDIN_PATH.equals(args.get(0))) {
-                String name = "base." + (isApex ? "apex" : "apk");
-                session.addFile(LOCATION_DATA_APP, name, sessionSizeBytes, STDIN_PATH_BYTES, null);
+                final String name = "base." + (isApex ? "apex" : "apk");
+                final String metadata = "-" + name;
+                session.addFile(LOCATION_DATA_APP, name, sessionSizeBytes,
+                        metadata.getBytes(StandardCharsets.UTF_8), null);
                 return 0;
             }
 
@@ -2998,24 +3000,58 @@
 
                 // 2. File with specified size read from stdin.
                 if (delimLocation != -1) {
-                    String name = arg.substring(0, delimLocation);
-                    String sizeStr = arg.substring(delimLocation + 1);
-                    long sizeBytes;
+                    final String[] fileDesc = arg.split(":");
+                    String name = null;
+                    long sizeBytes = -1;
+                    String metadata;
+                    byte[] signature = null;
+
+                    try {
+                        if (fileDesc.length > 0) {
+                            name = fileDesc[0];
+                        }
+                        if (fileDesc.length > 1) {
+                            sizeBytes = Long.parseUnsignedLong(fileDesc[1]);
+                        }
+                        if (fileDesc.length > 2 && !TextUtils.isEmpty(fileDesc[2])) {
+                            metadata = fileDesc[2];
+                        } else {
+                            metadata = name;
+                        }
+                        if (fileDesc.length > 3) {
+                            signature = Base64.getDecoder().decode(fileDesc[3]);
+                        }
+                    } catch (IllegalArgumentException e) {
+                        getErrPrintWriter().println(
+                                "Unable to parse file parameters: " + arg + ", reason: " + e);
+                        return 1;
+                    }
 
                     if (TextUtils.isEmpty(name)) {
                         getErrPrintWriter().println("Empty file name in: " + arg);
                         return 1;
                     }
+
+                    if (signature != null) {
+                        // Streaming/adb mode.
+                        metadata = "+" + metadata;
+                    } else {
+                        // Singleshot read from stdin.
+                        metadata = "-" + metadata;
+                    }
+
                     try {
-                        sizeBytes = Long.parseUnsignedLong(sizeStr);
-                    } catch (NumberFormatException e) {
-                        getErrPrintWriter().println("Unable to parse size from: " + arg);
+                        if (V4Signature.readFrom(signature) == null) {
+                            getErrPrintWriter().println("V4 signature is invalid in: " + arg);
+                            return 1;
+                        }
+                    } catch (Exception e) {
+                        getErrPrintWriter().println("V4 signature is invalid: " + e + " in " + arg);
                         return 1;
                     }
 
-                    // Incremental requires unique metadatas, let's add a name to the dash.
                     session.addFile(LOCATION_DATA_APP, name, sizeBytes,
-                            ("-" + name).getBytes(StandardCharsets.UTF_8), null);
+                            metadata.getBytes(StandardCharsets.UTF_8), signature);
                     continue;
                 }
 
diff --git a/services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp b/services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp
index 7e6f79f..70a9c09 100644
--- a/services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp
+++ b/services/core/jni/com_android_server_pm_PackageManagerShellCommandDataLoader.cpp
@@ -18,18 +18,27 @@
 #define LOG_TAG "PackageManagerShellCommandDataLoader-jni"
 #include <android-base/logging.h>
 
+#include <android-base/file.h>
+#include <android-base/stringprintf.h>
 #include <android-base/unique_fd.h>
-#include <nativehelper/JNIHelp.h>
-#include "android-base/file.h"
+#include <cutils/trace.h>
+#include <sys/eventfd.h>
+#include <sys/poll.h>
 
-#include <endian.h>
+#include <nativehelper/JNIHelp.h>
 
 #include <core_jni_helpers.h>
+#include <endian.h>
 
 #include "dataloader.h"
 
+#include <charconv>
 #include <chrono>
+#include <span>
+#include <string>
 #include <thread>
+#include <unordered_map>
+#include <unordered_set>
 
 namespace android {
 
@@ -39,9 +48,26 @@
 using android::base::ReadFully;
 using android::base::unique_fd;
 
+using namespace std::literals;
+
+using BlockSize = int16_t;
+using FileIdx = int16_t;
+using BlockIdx = int32_t;
+using NumBlocks = int32_t;
+using CompressionType = int16_t;
+using RequestType = int16_t;
+using MagicType = uint32_t;
+
 static constexpr int BUFFER_SIZE = 256 * 1024;
 static constexpr int BLOCKS_COUNT = BUFFER_SIZE / INCFS_DATA_FILE_BLOCK_SIZE;
 
+static constexpr int COMMAND_SIZE = 4 + 2 + 2 + 4; // bytes
+static constexpr int HEADER_SIZE = 2 + 2 + 4 + 2;  // bytes
+static constexpr std::string_view OKAY = "OKAY"sv;
+static constexpr MagicType INCR = 0x52434e49; // BE INCR
+
+static constexpr auto PollTimeoutMs = 5000;
+
 struct JniIds {
     jclass packageManagerShellCommandDataLoader;
     jmethodID pmscdLookupShellCommand;
@@ -85,6 +111,70 @@
     return ids;
 }
 
+struct BlockHeader {
+    FileIdx fileIdx = -1;
+    CompressionType compressionType = -1;
+    BlockIdx blockIdx = -1;
+    BlockSize blockSize = -1;
+} __attribute__((packed));
+
+static_assert(sizeof(BlockHeader) == HEADER_SIZE);
+
+static constexpr RequestType EXIT = 0;
+static constexpr RequestType BLOCK_MISSING = 1;
+static constexpr RequestType PREFETCH = 2;
+
+struct RequestCommand {
+    MagicType magic;
+    RequestType requestType;
+    FileIdx fileIdx;
+    BlockIdx blockIdx;
+} __attribute__((packed));
+
+static_assert(COMMAND_SIZE == sizeof(RequestCommand));
+
+static bool sendRequest(int fd, RequestType requestType, FileIdx fileIdx = -1,
+                        BlockIdx blockIdx = -1) {
+    const RequestCommand command{.magic = INCR,
+                                 .requestType = static_cast<int16_t>(be16toh(requestType)),
+                                 .fileIdx = static_cast<int16_t>(be16toh(fileIdx)),
+                                 .blockIdx = static_cast<int32_t>(be32toh(blockIdx))};
+    return android::base::WriteFully(fd, &command, sizeof(command));
+}
+
+static int waitForDataOrSignal(int fd, int event_fd) {
+    struct pollfd pfds[2] = {{fd, POLLIN, 0}, {event_fd, POLLIN, 0}};
+    // Wait indefinitely until either data is ready or stop signal is received
+    int res = poll(pfds, 2, PollTimeoutMs);
+    if (res <= 0) {
+        return res;
+    }
+    // First check if there is a stop signal
+    if (pfds[1].revents == POLLIN) {
+        return event_fd;
+    }
+    // Otherwise check if incoming data is ready
+    if (pfds[0].revents == POLLIN) {
+        return fd;
+    }
+    return -1;
+}
+
+static bool readChunk(int fd, std::vector<uint8_t>& data) {
+    int32_t size;
+    if (!android::base::ReadFully(fd, &size, sizeof(size))) {
+        return false;
+    }
+    size = int32_t(be32toh(size));
+    if (size <= 0) {
+        return false;
+    }
+    data.resize(size);
+    return android::base::ReadFully(fd, data.data(), data.size());
+}
+
+BlockHeader readHeader(std::span<uint8_t>& data);
+
 static inline int32_t readBEInt32(borrowed_fd fd) {
     int32_t result;
     ReadFully(fd, &result, sizeof(result));
@@ -106,6 +196,22 @@
     return readBEInt32(fd); // size of the verity tree
 }
 
+static inline IncFsSize verityTreeSizeForFile(IncFsSize fileSize) {
+    constexpr int SHA256_DIGEST_SIZE = 32;
+    constexpr int digest_size = SHA256_DIGEST_SIZE;
+    constexpr int hash_per_block = INCFS_DATA_FILE_BLOCK_SIZE / digest_size;
+
+    IncFsSize total_tree_block_count = 0;
+
+    auto block_count = 1 + (fileSize - 1) / INCFS_DATA_FILE_BLOCK_SIZE;
+    auto hash_block_count = block_count;
+    for (auto i = 0; hash_block_count > 1; i++) {
+        hash_block_count = (hash_block_count + hash_per_block - 1) / hash_per_block;
+        total_tree_block_count += hash_block_count;
+    }
+    return total_tree_block_count * INCFS_DATA_FILE_BLOCK_SIZE;
+}
+
 static inline unique_fd convertPfdToFdAndDup(JNIEnv* env, const JniIds& jni, jobject pfd) {
     if (!pfd) {
         ALOGE("Missing In ParcelFileDescriptor.");
@@ -125,8 +231,9 @@
 struct InputDesc {
     unique_fd fd;
     IncFsSize size;
-    IncFsBlockKind kind;
-    bool waitOnEof;
+    IncFsBlockKind kind = INCFS_BLOCK_KIND_DATA;
+    bool waitOnEof = false;
+    bool streaming = false;
 };
 using InputDescs = std::vector<InputDesc>;
 
@@ -135,8 +242,7 @@
     InputDescs result;
     result.reserve(2);
 
-    const bool fromStdin = (metadata.size == 0 || *metadata.data == '-');
-    if (fromStdin) {
+    if (metadata.size == 0 || *metadata.data == '-') {
         // stdin
         auto fd = convertPfdToFdAndDup(
                 env, jni,
@@ -146,12 +252,29 @@
             result.push_back(InputDesc{
                     .fd = std::move(fd),
                     .size = size,
-                    .kind = INCFS_BLOCK_KIND_DATA,
                     .waitOnEof = true,
             });
         }
         return result;
     }
+    if (*metadata.data == '+') {
+        // verity tree from stdin, rest is streaming
+        auto fd = convertPfdToFdAndDup(
+                env, jni,
+                env->CallStaticObjectMethod(jni.packageManagerShellCommandDataLoader,
+                                            jni.pmscdGetStdInPFD, shellCommand));
+        if (fd.ok()) {
+            auto treeSize = verityTreeSizeForFile(size);
+            result.push_back(InputDesc{
+                    .fd = std::move(fd),
+                    .size = treeSize,
+                    .kind = INCFS_BLOCK_KIND_HASH,
+                    .waitOnEof = true,
+                    .streaming = true,
+            });
+        }
+        return result;
+    }
 
     // local file and possibly signature
     const std::string filePath(metadata.data, metadata.size);
@@ -163,13 +286,17 @@
                                         jni.pmscdGetLocalFile, shellCommand,
                                         env->NewStringUTF(idsigPath.c_str())));
     if (idsigFd.ok()) {
-        ALOGE("idsig found, skipping to the tree");
-        auto treeSize = skipIdSigHeaders(idsigFd);
+        auto treeSize = verityTreeSizeForFile(size);
+        auto actualTreeSize = skipIdSigHeaders(idsigFd);
+        if (treeSize != actualTreeSize) {
+            ALOGE("Verity tree size mismatch: %d vs .idsig: %d.", int(treeSize),
+                  int(actualTreeSize));
+            return {};
+        }
         result.push_back(InputDesc{
                 .fd = std::move(idsigFd),
                 .size = treeSize,
                 .kind = INCFS_BLOCK_KIND_HASH,
-                .waitOnEof = false,
         });
     }
 
@@ -182,8 +309,6 @@
         result.push_back(InputDesc{
                 .fd = std::move(fileFd),
                 .size = size,
-                .kind = INCFS_BLOCK_KIND_DATA,
-                .waitOnEof = false,
         });
     }
 
@@ -226,19 +351,32 @@
                   android::dataloader::StatusListenerPtr statusListener,
                   android::dataloader::ServiceConnectorPtr,
                   android::dataloader::ServiceParamsPtr) final {
+        CHECK(ifs) << "ifs can't be null";
+        CHECK(statusListener) << "statusListener can't be null";
         mArgs = params.arguments();
         mIfs = ifs;
+        mStatusListener = statusListener;
         return true;
     }
     bool onStart() final { return true; }
-    void onStop() final {}
-    void onDestroy() final {}
+    void onStop() final {
+        mStopReceiving = true;
+        eventfd_write(mEventFd, 1);
+        if (mReceiverThread.joinable()) {
+            mReceiverThread.join();
+        }
+    }
+    void onDestroy() final {
+        ALOGE("Sending EXIT to server.");
+        sendRequest(mOutFd, EXIT);
+        // Make sure the receiver thread stopped.
+        CHECK(!mReceiverThread.joinable());
 
-    // IFS callbacks.
-    void onPendingReads(const dataloader::PendingReads& pendingReads) final {}
-    void onPageReads(const dataloader::PageReads& pageReads) final {}
+        mInFd.reset();
+        mOutFd.reset();
+    }
 
-    // FS callbacks.
+    // Installation.
     bool onPrepareImage(const dataloader::DataLoaderInstallationFiles& addedFiles) final {
         JNIEnv* env = GetOrAttachJNIEnvironment(mJvm);
         const auto& jni = jniIds(env);
@@ -257,6 +395,7 @@
         std::vector<IncFsDataBlock> blocks;
         blocks.reserve(BLOCKS_COUNT);
 
+        unique_fd streamingFd;
         for (auto&& file : addedFiles) {
             auto inputs = openInputs(env, jni, shellCommand, file.size, file.metadata);
             if (inputs.empty()) {
@@ -267,7 +406,6 @@
             }
 
             const auto fileId = IncFs_FileIdFromMetadata(file.metadata);
-
             const auto incfsFd(mIfs->openWrite(fileId));
             if (incfsFd < 0) {
                 ALOGE("Failed to open an IncFS file for metadata: %.*s, final file name is: %s. "
@@ -277,6 +415,9 @@
             }
 
             for (auto&& input : inputs) {
+                if (input.streaming && !streamingFd.ok()) {
+                    streamingFd.reset(dup(input.fd));
+                }
                 if (!copyToIncFs(incfsFd, input.size, input.kind, input.fd, input.waitOnEof,
                                  &buffer, &blocks)) {
                     ALOGE("Failed to copy data to IncFS file for metadata: %.*s, final file name "
@@ -288,7 +429,12 @@
             }
         }
 
-        ALOGE("All done.");
+        if (streamingFd.ok()) {
+            ALOGE("onPrepareImage: done, proceeding to streaming.");
+            return initStreaming(std::move(streamingFd));
+        }
+
+        ALOGE("onPrepareImage: done.");
         return true;
     }
 
@@ -378,11 +524,253 @@
         return true;
     }
 
+    // Read tracing.
+    struct TracedRead {
+        uint64_t timestampUs;
+        android::dataloader::FileId fileId;
+        uint32_t firstBlockIdx;
+        uint32_t count;
+    };
+
+    void onPageReads(const android::dataloader::PageReads& pageReads) final {
+        auto trace = atrace_is_tag_enabled(ATRACE_TAG);
+        if (CC_LIKELY(!trace)) {
+            return;
+        }
+
+        TracedRead last = {};
+        for (auto&& read : pageReads) {
+            if (read.id != last.fileId || read.block != last.firstBlockIdx + last.count) {
+                traceRead(last);
+                last = TracedRead{
+                        .timestampUs = read.bootClockTsUs,
+                        .fileId = read.id,
+                        .firstBlockIdx = (uint32_t)read.block,
+                        .count = 1,
+                };
+            } else {
+                ++last.count;
+            }
+        }
+        traceRead(last);
+    }
+
+    void traceRead(const TracedRead& read) {
+        if (!read.count) {
+            return;
+        }
+
+        FileIdx fileIdx = convertFileIdToFileIndex(read.fileId);
+        auto str = android::base::StringPrintf("page_read: index=%lld count=%lld file=%d",
+                                               static_cast<long long>(read.firstBlockIdx),
+                                               static_cast<long long>(read.count),
+                                               static_cast<int>(fileIdx));
+        ATRACE_BEGIN(str.c_str());
+        ATRACE_END();
+    }
+
+    // Streaming.
+    bool initStreaming(unique_fd inout) {
+        mInFd.reset(dup(inout));
+        mOutFd.reset(dup(inout));
+        if (mInFd < 0 || mOutFd < 0) {
+            ALOGE("Failed to dup FDs.");
+            return false;
+        }
+
+        mEventFd.reset(eventfd(0, EFD_CLOEXEC));
+        if (mEventFd < 0) {
+            ALOGE("Failed to create eventfd.");
+            return false;
+        }
+
+        // Awaiting adb handshake.
+        char okay_buf[OKAY.size()];
+        if (!android::base::ReadFully(mInFd, okay_buf, OKAY.size())) {
+            ALOGE("Failed to receive OKAY. Abort.");
+            return false;
+        }
+        if (std::string_view(okay_buf, OKAY.size()) != OKAY) {
+            ALOGE("Received '%.*s', expecting '%.*s'", (int)OKAY.size(), okay_buf, (int)OKAY.size(),
+                  OKAY.data());
+            return false;
+        }
+
+        mReceiverThread = std::thread([this]() { receiver(); });
+        ALOGI("Started streaming...");
+        return true;
+    }
+
+    // IFS callbacks.
+    void onPendingReads(const dataloader::PendingReads& pendingReads) final {
+        CHECK(mIfs);
+        for (auto&& pendingRead : pendingReads) {
+            const android::dataloader::FileId& fileId = pendingRead.id;
+            const auto blockIdx = static_cast<BlockIdx>(pendingRead.block);
+            /*
+            ALOGI("Missing: %d", (int) blockIdx);
+            */
+            FileIdx fileIdx = convertFileIdToFileIndex(fileId);
+            if (fileIdx < 0) {
+                ALOGE("Failed to handle event for fileid=%s. Ignore.",
+                      android::incfs::toString(fileId).c_str());
+                continue;
+            }
+            if (mRequestedFiles.insert(fileIdx).second) {
+                if (!sendRequest(mOutFd, PREFETCH, fileIdx, blockIdx)) {
+                    ALOGE("Failed to request prefetch for fileid=%s. Ignore.",
+                          android::incfs::toString(fileId).c_str());
+                    mRequestedFiles.erase(fileIdx);
+                    mStatusListener->reportStatus(DATA_LOADER_NO_CONNECTION);
+                }
+            }
+            sendRequest(mOutFd, BLOCK_MISSING, fileIdx, blockIdx);
+        }
+    }
+
+    void receiver() {
+        std::vector<uint8_t> data;
+        std::vector<IncFsDataBlock> instructions;
+        std::unordered_map<FileIdx, unique_fd> writeFds;
+        while (!mStopReceiving) {
+            const int res = waitForDataOrSignal(mInFd, mEventFd);
+            if (res == 0) {
+                continue;
+            }
+            if (res < 0) {
+                ALOGE("Failed to poll. Abort.");
+                mStatusListener->reportStatus(DATA_LOADER_NO_CONNECTION);
+                break;
+            }
+            if (res == mEventFd) {
+                ALOGE("Received stop signal. Exit.");
+                break;
+            }
+            if (!readChunk(mInFd, data)) {
+                ALOGE("Failed to read a message. Abort.");
+                mStatusListener->reportStatus(DATA_LOADER_NO_CONNECTION);
+                break;
+            }
+            auto remainingData = std::span(data);
+            while (!remainingData.empty()) {
+                auto header = readHeader(remainingData);
+                if (header.fileIdx == -1 && header.compressionType == 0 && header.blockIdx == 0 &&
+                    header.blockSize == 0) {
+                    ALOGI("Stop signal received. Sending exit command (remaining bytes: %d).",
+                          int(remainingData.size()));
+
+                    sendRequest(mOutFd, EXIT);
+                    mStopReceiving = true;
+                    break;
+                }
+                if (header.fileIdx < 0 || header.blockSize <= 0 || header.compressionType < 0 ||
+                    header.blockIdx < 0) {
+                    ALOGE("invalid header received. Abort.");
+                    mStopReceiving = true;
+                    break;
+                }
+                const FileIdx fileIdx = header.fileIdx;
+                const android::dataloader::FileId fileId = convertFileIndexToFileId(fileIdx);
+                if (!android::incfs::isValidFileId(fileId)) {
+                    ALOGE("Unknown data destination for file ID %d. "
+                          "Ignore.",
+                          header.fileIdx);
+                    continue;
+                }
+
+                auto& writeFd = writeFds[fileIdx];
+                if (writeFd < 0) {
+                    writeFd = this->mIfs->openWrite(fileId);
+                    if (writeFd < 0) {
+                        ALOGE("Failed to open file %d for writing (%d). Aboring.", header.fileIdx,
+                              -writeFd);
+                        break;
+                    }
+                }
+
+                const auto inst = IncFsDataBlock{
+                        .fileFd = writeFd,
+                        .pageIndex = static_cast<IncFsBlockIndex>(header.blockIdx),
+                        .compression = static_cast<IncFsCompressionKind>(header.compressionType),
+                        .kind = INCFS_BLOCK_KIND_DATA,
+                        .dataSize = static_cast<uint16_t>(header.blockSize),
+                        .data = (const char*)remainingData.data(),
+                };
+                instructions.push_back(inst);
+                remainingData = remainingData.subspan(header.blockSize);
+            }
+            writeInstructions(instructions);
+        }
+        writeInstructions(instructions);
+    }
+
+    void writeInstructions(std::vector<IncFsDataBlock>& instructions) {
+        auto res = this->mIfs->writeBlocks(instructions);
+        if (res != instructions.size()) {
+            ALOGE("Dailed to write data to Incfs (res=%d when expecting %d)", res,
+                  int(instructions.size()));
+        }
+        instructions.clear();
+    }
+
+    FileIdx convertFileIdToFileIndex(android::dataloader::FileId fileId) {
+        // FileId is a string in format '+FileIdx\0'.
+        const char* meta = (const char*)&fileId;
+        if (*meta != '+') {
+            return -1;
+        }
+
+        int fileIdx;
+        auto res = std::from_chars(meta + 1, meta + sizeof(fileId), fileIdx);
+        if (res.ec != std::errc{} || fileIdx < std::numeric_limits<FileIdx>::min() ||
+            fileIdx > std::numeric_limits<FileIdx>::max()) {
+            return -1;
+        }
+
+        return FileIdx(fileIdx);
+    }
+
+    android::dataloader::FileId convertFileIndexToFileId(FileIdx fileIdx) {
+        IncFsFileId fileId = {};
+        char* meta = (char*)&fileId;
+        *meta = '+';
+        if (auto [p, ec] = std::to_chars(meta + 1, meta + sizeof(fileId), fileIdx);
+            ec != std::errc()) {
+            return {};
+        }
+        return fileId;
+    }
+
     JavaVM* const mJvm;
     std::string mArgs;
-    android::dataloader::FilesystemConnectorPtr mIfs;
+    android::dataloader::FilesystemConnectorPtr mIfs = nullptr;
+    android::dataloader::StatusListenerPtr mStatusListener = nullptr;
+    android::base::unique_fd mInFd;
+    android::base::unique_fd mOutFd;
+    android::base::unique_fd mEventFd;
+    std::thread mReceiverThread;
+    std::atomic<bool> mStopReceiving = false;
+    /** Tracks which files have been requested */
+    std::unordered_set<FileIdx> mRequestedFiles;
 };
 
+BlockHeader readHeader(std::span<uint8_t>& data) {
+    BlockHeader header;
+    if (data.size() < sizeof(header)) {
+        return header;
+    }
+
+    header.fileIdx = static_cast<FileIdx>(be16toh(*reinterpret_cast<const uint16_t*>(&data[0])));
+    header.compressionType =
+            static_cast<CompressionType>(be16toh(*reinterpret_cast<const uint16_t*>(&data[2])));
+    header.blockIdx = static_cast<BlockIdx>(be32toh(*reinterpret_cast<const uint32_t*>(&data[4])));
+    header.blockSize =
+            static_cast<BlockSize>(be16toh(*reinterpret_cast<const uint16_t*>(&data[8])));
+    data = data.subspan(sizeof(header));
+
+    return header;
+}
+
 static void nativeInitialize(JNIEnv* env, jclass klass) {
     jniIds(env);
 }