| // Copyright (C) 2019 The Android Open Source Project |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| #include "prefetcher/minijail.h" |
| #include "common/cmd_utils.h" |
| #include "prefetcher/prefetcher_daemon.h" |
| #include "prefetcher/session_manager.h" |
| #include "prefetcher/session.h" |
| |
| #include <android-base/logging.h> |
| #include <android-base/properties.h> |
| |
| #include <deque> |
| #include <iomanip> |
| #include <string> |
| #include <sstream> |
| #include <vector> |
| |
| #include <fcntl.h> |
| #include <string.h> |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| #include <sys/wait.h> |
| #include <sys/un.h> |
| #include <unistd.h> |
| |
| namespace iorap::prefetcher { |
| |
| // Gate super-spammy IPC logging behind a property. |
| // This is beyond merely annoying, enabling this logging causes prefetching to be about 1000x slower. |
| static bool LogVerboseIpc() { |
| static bool initialized = false; |
| static bool verbose_ipc; |
| |
| if (initialized == false) { |
| initialized = true; |
| |
| verbose_ipc = |
| ::android::base::GetBoolProperty("iorapd.readahead.verbose_ipc", /*default*/false); |
| } |
| |
| return verbose_ipc; |
| } |
| |
| static const bool kInstallMiniJail = |
| ::android::base::GetBoolProperty("iorapd.readahead.minijail", /*default*/true); |
| |
| static constexpr const char kCommandFileName[] = "/system/bin/iorap.prefetcherd"; |
| |
| static constexpr size_t kPipeBufferSize = 1024 * 1024; // matches /proc/sys/fs/pipe-max-size |
| |
| using ArgString = const char*; |
| |
| std::ostream& operator<<(std::ostream& os, ReadAheadKind ps) { |
| switch (ps) { |
| case ReadAheadKind::kFadvise: |
| os << "fadvise"; |
| break; |
| case ReadAheadKind::kMmapLocked: |
| os << "mmap"; |
| break; |
| case ReadAheadKind::kMlock: |
| os << "mlock"; |
| break; |
| default: |
| os << "<invalid>"; |
| } |
| return os; |
| } |
| |
| std::ostream& operator<<(std::ostream& os, CommandChoice choice) { |
| switch (choice) { |
| case CommandChoice::kRegisterFilePath: |
| os << "kRegisterFilePath"; |
| break; |
| case CommandChoice::kUnregisterFilePath: |
| os << "kUnregisterFilePath"; |
| break; |
| case CommandChoice::kReadAhead: |
| os << "kReadAhead"; |
| break; |
| case CommandChoice::kExit: |
| os << "kExit"; |
| break; |
| case CommandChoice::kCreateSession: |
| os << "kCreateSession"; |
| break; |
| case CommandChoice::kDestroySession: |
| os << "kDestroySession"; |
| break; |
| case CommandChoice::kDumpSession: |
| os << "kDumpSession"; |
| break; |
| case CommandChoice::kDumpEverything: |
| os << "kDumpEverything"; |
| break; |
| case CommandChoice::kCreateFdSession: |
| os << "kCreateFdSession"; |
| break; |
| default: |
| CHECK(false) << "forgot to handle this choice"; |
| break; |
| } |
| return os; |
| } |
| |
| std::ostream& operator<<(std::ostream& os, const Command& command) { |
| os << "Command{"; |
| os << "choice=" << command.choice << ","; |
| |
| bool has_session_id = true; |
| bool has_id = true; |
| switch (command.choice) { |
| case CommandChoice::kDumpEverything: |
| case CommandChoice::kExit: |
| has_session_id = false; |
| FALLTHROUGH_INTENDED; |
| case CommandChoice::kCreateFdSession: |
| case CommandChoice::kCreateSession: |
| case CommandChoice::kDestroySession: |
| case CommandChoice::kDumpSession: |
| has_id = false; |
| break; |
| default: |
| break; |
| } |
| |
| if (has_session_id) { |
| os << "sid=" << command.session_id << ","; |
| } |
| |
| if (has_id) { |
| os << "id=" << command.id << ","; |
| } |
| |
| switch (command.choice) { |
| case CommandChoice::kRegisterFilePath: |
| os << "file_path="; |
| |
| if (command.file_path) { |
| os << *(command.file_path); |
| } else { |
| os << "(nullopt)"; |
| } |
| break; |
| case CommandChoice::kUnregisterFilePath: |
| break; |
| case CommandChoice::kReadAhead: |
| os << "read_ahead_kind=" << command.read_ahead_kind << ","; |
| os << "length=" << command.length << ","; |
| os << "offset=" << command.offset << ","; |
| break; |
| case CommandChoice::kExit: |
| break; |
| case CommandChoice::kCreateFdSession: |
| os << "fd="; |
| if (command.fd.has_value()) { |
| os << command.fd.value(); |
| } else { |
| os << "(nullopt)"; |
| } |
| os << ","; |
| FALLTHROUGH_INTENDED; |
| case CommandChoice::kCreateSession: |
| os << "description="; |
| if (command.file_path) { |
| os << "'" << *(command.file_path) << "'"; |
| } else { |
| os << "(nullopt)"; |
| } |
| break; |
| case CommandChoice::kDestroySession: |
| break; |
| case CommandChoice::kDumpSession: |
| break; |
| case CommandChoice::kDumpEverything: |
| break; |
| default: |
| CHECK(false) << "forgot to handle this choice"; |
| break; |
| } |
| |
| os << "}"; |
| |
| return os; |
| } |
| |
| template <typename T> |
| struct ParseResult { |
| T value; |
| char* next_token; |
| size_t stream_size; |
| |
| ParseResult() : value{}, next_token{nullptr}, stream_size{} { |
| } |
| |
| constexpr operator bool() const { |
| return next_token != nullptr; |
| } |
| }; |
| |
| // Very spammy: Keep it off by default. Set to true if changing this code. |
| static constexpr bool kDebugParsingRead = false; |
| |
| #define DEBUG_PREAD if (kDebugParsingRead) LOG(VERBOSE) << "ParsingRead " |
| |
| |
| |
| // Parse a strong type T from a buffer stream. |
| // If there's insufficient space left to parse the value, an empty ParseResult is returned. |
| template <typename T> |
| ParseResult<T> ParsingRead(char* stream, size_t stream_size) { |
| if (stream == nullptr) { |
| DEBUG_PREAD << "stream was null"; |
| return {}; |
| } |
| |
| if constexpr (std::is_same_v<T, std::string>) { |
| ParseResult<uint32_t> length = ParsingRead<uint32_t>(stream, stream_size); |
| |
| if (!length) { |
| DEBUG_PREAD << "could not find length"; |
| // Not enough bytes left? |
| return {}; |
| } |
| |
| ParseResult<std::string> string_result; |
| string_result.value.reserve(length); |
| |
| stream = length.next_token; |
| stream_size = length.stream_size; |
| |
| for (size_t i = 0; i < length.value; ++i) { |
| ParseResult<char> char_result = ParsingRead<char>(stream, stream_size); |
| |
| stream = char_result.next_token; |
| stream_size = char_result.stream_size; |
| |
| if (!char_result) { |
| DEBUG_PREAD << "too few chars in stream, expected length: " << length.value; |
| // Not enough bytes left? |
| return {}; |
| } |
| |
| string_result.value += char_result.value; |
| |
| DEBUG_PREAD << "string preliminary is : " << string_result.value; |
| } |
| |
| DEBUG_PREAD << "parsed string to: " << string_result.value; |
| string_result.next_token = stream; |
| return string_result; |
| } else { |
| if (sizeof(T) > stream_size) { |
| return {}; |
| } |
| |
| ParseResult<T> result; |
| result.next_token = stream + sizeof(T); |
| result.stream_size = stream_size - sizeof(T); |
| |
| memcpy(&result.value, stream, sizeof(T)); |
| |
| return result; |
| } |
| } |
| |
| // Convenience overload to chain multiple ParsingRead together. |
| template <typename T, typename U> |
| ParseResult<T> ParsingRead(ParseResult<U> result) { |
| return ParsingRead<T>(result.next_token, result.stream_size); |
| } |
| |
| class CommandParser { |
| public: |
| CommandParser(PrefetcherForkParameters params) { |
| params_ = params; |
| } |
| |
| std::vector<Command> ParseSocketCommands(bool& eof) { |
| eof = false; |
| |
| std::vector<Command> commands_vec; |
| |
| std::vector<char> buf_vector; |
| buf_vector.resize(1024*1024); // 1MB. |
| char* buf = &buf_vector[0]; |
| |
| // Binary only parsing. The higher level code can parse text |
| // with ifstream if it really wants to. |
| char* stream = &buf[0]; |
| size_t stream_size = buf_vector.size(); |
| |
| while (true) { |
| if (stream_size == 0) { |
| // TODO: reply with an overflow command. |
| LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; |
| stream = &buf[0]; |
| stream_size = buf_vector.size(); |
| memset(&buf[0], /*c*/0, buf_vector.size()); |
| } |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon block recvmsg for commands (fd=" << params_.input_fd << ")"; |
| } |
| |
| ssize_t count; |
| struct msghdr hdr; |
| memset(&hdr, 0, sizeof(hdr)); |
| |
| { |
| union { |
| struct cmsghdr cmh; |
| char control[CMSG_SPACE(sizeof(int))]; |
| } control_un; |
| memset(&control_un, 0, sizeof(control_un)); |
| |
| /* Set 'control_un' to describe ancillary data that we want to receive */ |
| control_un.cmh.cmsg_len = CMSG_LEN(sizeof(int)); /* fd is sizeof(int) */ |
| control_un.cmh.cmsg_level = SOL_SOCKET; |
| control_un.cmh.cmsg_type = SCM_CREDENTIALS; |
| |
| // the regular message data will be read into stream |
| struct iovec iov; |
| memset(&iov, 0, sizeof(iov)); |
| iov.iov_base = stream; |
| iov.iov_len = stream_size; |
| |
| /* Set hdr fields to describe 'control_un' */ |
| hdr.msg_control = control_un.control; |
| hdr.msg_controllen = sizeof(control_un.control); |
| hdr.msg_iov = &iov; |
| hdr.msg_iovlen = 1; |
| hdr.msg_name = nullptr; /* no peer address */ |
| hdr.msg_namelen = 0; |
| |
| count = TEMP_FAILURE_RETRY(recvmsg(params_.input_fd, &hdr, /*flags*/0)); |
| } |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon recvmsg " << count << " for stream size:" << stream_size; |
| } |
| |
| if (count < 0) { |
| PLOG(ERROR) << "failed to recvmsg from input fd"; |
| break; |
| // TODO: let the daemon be restarted by higher level code? |
| } else if (count == 0) { |
| LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; |
| eof = true; |
| break; |
| // TODO: let the daemon be restarted by higher level code? |
| } |
| |
| { |
| /* Extract fd from ancillary data if present */ |
| struct cmsghdr* hp; |
| hp = CMSG_FIRSTHDR(&hdr); |
| if (hp && |
| // FIXME: hp->cmsg_len returns an absurdly large value. is it overflowing? |
| // (hp->cmsg_len == CMSG_LEN(sizeof(int))) && |
| (hp->cmsg_level == SOL_SOCKET) && |
| (hp->cmsg_type == SCM_RIGHTS)) { |
| |
| int passed_fd = *(int*) CMSG_DATA(hp); |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon received FD " << passed_fd; |
| } |
| |
| // tack the FD into our dequeue. |
| // we assume the FDs are sent in-order same as the regular iov are sent in-order. |
| longbuf_fds_.insert(longbuf_fds_.end(), passed_fd); |
| } else if (hp != nullptr) { |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon::read got CMSG but it wasn't matching SCM_RIGHTS," |
| << "cmsg_len=" << hp->cmsg_len << "," |
| << "cmsg_level=" << hp->cmsg_level << "," |
| << "cmsg_type=" << hp->cmsg_type; |
| } |
| } |
| } |
| |
| longbuf_.insert(longbuf_.end(), stream, stream + count); |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); |
| } |
| |
| // reconstruct a stream of [iov_Command chdr_fd?]* back into [Command]* |
| { |
| if (longbuf_.size() == 0) { |
| break; |
| } |
| |
| std::vector<char> v(longbuf_.begin(), |
| longbuf_.end()); |
| |
| std::vector<int> v_fds{longbuf_fds_.begin(), longbuf_fds_.end()}; |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); |
| if (WOULD_LOG(VERBOSE)) { |
| std::stringstream dump; |
| dump << std::hex << std::setfill('0'); |
| for (size_t i = 0; i < v.size(); ++i) { |
| dump << std::setw(2) << static_cast<unsigned>(v[i]); |
| } |
| |
| LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); |
| } |
| LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ size: " << v_fds.size(); |
| if (WOULD_LOG(VERBOSE)) { |
| std::stringstream dump; |
| for (size_t i = 0; i < v_fds.size(); ++i) { |
| dump << v_fds[i] << ", "; |
| } |
| |
| LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ dump: " << dump.str(); |
| } |
| |
| } |
| |
| size_t v_fds_off = 0; |
| size_t consumed_fds_total = 0; |
| |
| size_t v_off = 0; |
| size_t consumed_bytes = std::numeric_limits<size_t>::max(); |
| size_t consumed_total = 0; |
| |
| while (true) { |
| std::optional<Command> maybe_command; |
| maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); |
| consumed_total += consumed_bytes; |
| // Normal every time we get to the end of a buffer. |
| if (!maybe_command) { |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); |
| } |
| break; |
| } |
| |
| if (maybe_command->RequiresFd()) { |
| if (v_fds_off < v_fds.size()) { |
| maybe_command->fd = v_fds[v_fds_off++]; |
| consumed_fds_total++; |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "Append the FD to " << *maybe_command; |
| } |
| } else { |
| LOG(WARNING) << "Failed to acquire FD for " << *maybe_command; |
| } |
| } |
| |
| // in the next pass ignore what we already consumed. |
| v_off += consumed_bytes; |
| |
| // true as long we don't hit the 'break' above. |
| DCHECK_EQ(v_off, consumed_total); |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() |
| << "," << *maybe_command; |
| |
| // Pretty-print a single command for debugging/testing. |
| LOG(VERBOSE) << *maybe_command; |
| } |
| |
| // add to the commands we parsed. |
| commands_vec.push_back(*maybe_command); |
| } |
| |
| // erase however many were consumed |
| longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); |
| |
| // erase however many FDs were consumed. |
| longbuf_fds_.erase(longbuf_fds_.begin(), longbuf_fds_.begin() + consumed_fds_total); |
| } |
| break; |
| } |
| |
| return commands_vec; |
| } |
| |
| std::vector<Command> ParseCommands(bool& eof) { |
| eof = false; |
| |
| std::vector<Command> commands_vec; |
| |
| std::vector<char> buf_vector; |
| buf_vector.resize(kPipeBufferSize); |
| char* buf = &buf_vector[0]; |
| |
| // Binary only parsing. The higher level code can parse text |
| // with ifstream if it really wants to. |
| char* stream = &buf[0]; |
| size_t stream_size = buf_vector.size(); |
| |
| while (true) { |
| if (stream_size == 0) { |
| // TODO: reply with an overflow command. |
| LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; |
| stream = &buf[0]; |
| stream_size = buf_vector.size(); |
| memset(&buf[0], /*c*/0, buf_vector.size()); |
| } |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon block read for commands (fd=" << params_.input_fd << ")"; |
| } |
| ssize_t count = TEMP_FAILURE_RETRY(read(params_.input_fd, stream, stream_size)); |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon::read " << count << " for stream size:" << stream_size; |
| } |
| |
| if (count < 0) { |
| PLOG(ERROR) << "failed to read from input fd"; |
| break; |
| // TODO: let the daemon be restarted by higher level code? |
| } else if (count == 0) { |
| LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; |
| eof = true; |
| break; |
| // TODO: let the daemon be restarted by higher level code? |
| } |
| |
| longbuf_.insert(longbuf_.end(), stream, stream + count); |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); |
| } |
| |
| std::optional<Command> maybe_command; |
| { |
| if (longbuf_.size() == 0) { |
| break; |
| } |
| |
| std::vector<char> v(longbuf_.begin(), |
| longbuf_.end()); |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); |
| if (WOULD_LOG(VERBOSE)) { |
| std::stringstream dump; |
| dump << std::hex << std::setfill('0'); |
| for (size_t i = 0; i < v.size(); ++i) { |
| dump << std::setw(2) << static_cast<unsigned>(v[i]); |
| } |
| |
| LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); |
| } |
| } |
| |
| size_t v_off = 0; |
| size_t consumed_bytes = std::numeric_limits<size_t>::max(); |
| size_t consumed_total = 0; |
| |
| while (true) { |
| maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); |
| consumed_total += consumed_bytes; |
| // Normal every time we get to the end of a buffer. |
| if (!maybe_command) { |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); |
| } |
| break; |
| } |
| |
| // in the next pass ignore what we already consumed. |
| v_off += consumed_bytes; |
| |
| // true as long we don't hit the 'break' above. |
| DCHECK_EQ(v_off, consumed_total); |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() |
| << "," << *maybe_command; |
| |
| // Pretty-print a single command for debugging/testing. |
| LOG(VERBOSE) << *maybe_command; |
| } |
| |
| // add to the commands we parsed. |
| commands_vec.push_back(*maybe_command); |
| } |
| |
| // erase however many were consumed |
| longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); |
| } |
| break; |
| } |
| |
| return commands_vec; |
| } |
| |
| private: |
| bool IsTextMode() const { |
| return params_.format_text; |
| } |
| |
| PrefetcherForkParameters params_; |
| |
| // A buffer long enough to contain a lot of buffers. |
| // This handles reads that only contain a partial command. |
| std::deque<char> longbuf_; |
| |
| // File descriptor buffers. |
| std::deque<int> longbuf_fds_; |
| }; |
| |
| static constexpr bool kDebugCommandRead = true; |
| |
| #define DEBUG_READ if (kDebugCommandRead) LOG(VERBOSE) << "Command::Read " |
| |
| std::optional<Command> Command::Read(char* buf, size_t buf_size, /*out*/size_t* consumed_bytes) { |
| *consumed_bytes = 0; |
| if (buf == nullptr) { |
| return std::nullopt; |
| } |
| |
| Command cmd{}; // zero-initialize any unused fields |
| ParseResult<CommandChoice> parsed_choice = ParsingRead<CommandChoice>(buf, buf_size); |
| cmd.choice = parsed_choice.value; |
| |
| if (!parsed_choice) { |
| DEBUG_READ << "no choice"; |
| return std::nullopt; |
| } |
| |
| switch (parsed_choice.value) { |
| case CommandChoice::kRegisterFilePath: { |
| ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); |
| if (!parsed_session_id) { |
| DEBUG_READ << "no parsed session id"; |
| return std::nullopt; |
| } |
| |
| ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); |
| if (!parsed_id) { |
| DEBUG_READ << "no parsed id"; |
| return std::nullopt; |
| } |
| |
| ParseResult<std::string> parsed_file_path = ParsingRead<std::string>(parsed_id); |
| |
| if (!parsed_file_path) { |
| DEBUG_READ << "no file path"; |
| return std::nullopt; |
| } |
| *consumed_bytes = parsed_file_path.next_token - buf; |
| |
| cmd.session_id = parsed_session_id.value; |
| cmd.id = parsed_id.value; |
| cmd.file_path = parsed_file_path.value; |
| |
| break; |
| } |
| case CommandChoice::kUnregisterFilePath: { |
| ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); |
| if (!parsed_session_id) { |
| DEBUG_READ << "no parsed session id"; |
| return std::nullopt; |
| } |
| |
| ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); |
| if (!parsed_id) { |
| DEBUG_READ << "no parsed id"; |
| return std::nullopt; |
| } |
| *consumed_bytes = parsed_id.next_token - buf; |
| |
| cmd.session_id = parsed_session_id.value; |
| cmd.id = parsed_id.value; |
| |
| break; |
| } |
| case CommandChoice::kReadAhead: { |
| ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); |
| if (!parsed_session_id) { |
| DEBUG_READ << "no parsed session id"; |
| return std::nullopt; |
| } |
| |
| ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); |
| if (!parsed_id) { |
| DEBUG_READ << "no parsed id"; |
| return std::nullopt; |
| } |
| |
| ParseResult<ReadAheadKind> parsed_kind = ParsingRead<ReadAheadKind>(parsed_id); |
| if (!parsed_kind) { |
| DEBUG_READ << "no parsed kind"; |
| return std::nullopt; |
| } |
| ParseResult<uint64_t> parsed_length = ParsingRead<uint64_t>(parsed_kind); |
| if (!parsed_length) { |
| DEBUG_READ << "no parsed length"; |
| return std::nullopt; |
| } |
| ParseResult<uint64_t> parsed_offset = ParsingRead<uint64_t>(parsed_length); |
| if (!parsed_offset) { |
| DEBUG_READ << "no parsed offset"; |
| return std::nullopt; |
| } |
| *consumed_bytes = parsed_offset.next_token - buf; |
| |
| cmd.session_id = parsed_session_id.value; |
| cmd.id = parsed_id.value; |
| cmd.read_ahead_kind = parsed_kind.value; |
| cmd.length = parsed_length.value; |
| cmd.offset = parsed_offset.value; |
| |
| break; |
| } |
| case CommandChoice::kCreateSession: |
| case CommandChoice::kCreateFdSession: { |
| ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); |
| if (!parsed_session_id) { |
| DEBUG_READ << "no parsed session id"; |
| return std::nullopt; |
| } |
| |
| ParseResult<std::string> parsed_description = ParsingRead<std::string>(parsed_session_id); |
| |
| if (!parsed_description) { |
| DEBUG_READ << "no description"; |
| return std::nullopt; |
| } |
| *consumed_bytes = parsed_description.next_token - buf; |
| |
| cmd.session_id = parsed_session_id.value; |
| cmd.file_path = parsed_description.value; |
| |
| break; |
| } |
| case CommandChoice::kDestroySession: |
| case CommandChoice::kDumpSession: { |
| ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); |
| if (!parsed_session_id) { |
| DEBUG_READ << "no parsed session id"; |
| return std::nullopt; |
| } |
| |
| *consumed_bytes = parsed_session_id.next_token - buf; |
| |
| cmd.session_id = parsed_session_id.value; |
| |
| break; |
| } |
| case CommandChoice::kExit: |
| case CommandChoice::kDumpEverything: |
| *consumed_bytes = parsed_choice.next_token - buf; |
| // Only need to parse the choice. |
| break; |
| default: |
| LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(parsed_choice.value); |
| break; |
| } |
| |
| return cmd; |
| } |
| |
| bool Command::Write(char* buf, size_t buf_size, /*out*/size_t* produced_bytes) const { |
| *produced_bytes = 0; |
| if (buf == nullptr) { |
| LOG(WARNING) << "null buf, is this expected?"; |
| return false; |
| } |
| |
| bool has_enough_space = false; |
| size_t space_requirement = std::numeric_limits<size_t>::max(); |
| |
| space_requirement = sizeof(choice); |
| |
| switch (choice) { |
| case CommandChoice::kRegisterFilePath: |
| space_requirement += sizeof(session_id); |
| space_requirement += sizeof(id); |
| space_requirement += sizeof(uint32_t); // string length |
| |
| if (!file_path) { |
| LOG(WARNING) << "Missing file path for kRegisterFilePath"; |
| return false; |
| } |
| |
| space_requirement += file_path->size(); // string contents |
| break; |
| case CommandChoice::kUnregisterFilePath: |
| space_requirement += sizeof(session_id); |
| space_requirement += sizeof(id); |
| break; |
| case CommandChoice::kReadAhead: |
| space_requirement += sizeof(session_id); |
| space_requirement += sizeof(id); |
| space_requirement += sizeof(read_ahead_kind); |
| space_requirement += sizeof(length); |
| space_requirement += sizeof(offset); |
| break; |
| case CommandChoice::kCreateSession: |
| case CommandChoice::kCreateFdSession: |
| space_requirement += sizeof(session_id); |
| space_requirement += sizeof(uint32_t); // string length |
| |
| if (!file_path) { |
| LOG(WARNING) << "Missing file path for kCreateSession"; |
| return false; |
| } |
| |
| space_requirement += file_path->size(); // string contents |
| break; |
| case CommandChoice::kDestroySession: |
| case CommandChoice::kDumpSession: |
| space_requirement += sizeof(session_id); |
| break; |
| case CommandChoice::kExit: |
| case CommandChoice::kDumpEverything: |
| // Only need space for the choice. |
| break; |
| default: |
| LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(choice); |
| break; |
| } |
| |
| if (buf_size < space_requirement) { |
| return false; |
| } |
| |
| *produced_bytes = space_requirement; |
| |
| // Always write out the choice. |
| size_t buf_offset = 0; |
| |
| memcpy(&buf[buf_offset], &choice, sizeof(choice)); |
| buf_offset += sizeof(choice); |
| |
| switch (choice) { |
| case CommandChoice::kRegisterFilePath: |
| memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); |
| buf_offset += sizeof(session_id); |
| memcpy(&buf[buf_offset], &id, sizeof(id)); |
| buf_offset += sizeof(id); |
| |
| { |
| uint32_t string_length = static_cast<uint32_t>(file_path->size()); |
| memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); |
| buf_offset += sizeof(string_length); |
| } |
| |
| DCHECK(file_path.has_value()); |
| |
| memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); |
| buf_offset += file_path->size(); |
| break; |
| case CommandChoice::kUnregisterFilePath: |
| memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); |
| buf_offset += sizeof(session_id); |
| memcpy(&buf[buf_offset], &id, sizeof(id)); |
| buf_offset += sizeof(id); |
| break; |
| case CommandChoice::kReadAhead: |
| memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); |
| buf_offset += sizeof(session_id); |
| memcpy(&buf[buf_offset], &id, sizeof(id)); |
| buf_offset += sizeof(id); |
| memcpy(&buf[buf_offset], &read_ahead_kind, sizeof(read_ahead_kind)); |
| buf_offset += sizeof(read_ahead_kind); |
| memcpy(&buf[buf_offset], &length, sizeof(length)); |
| buf_offset += sizeof(length); |
| memcpy(&buf[buf_offset], &offset, sizeof(offset)); |
| buf_offset += sizeof(offset); |
| break; |
| case CommandChoice::kCreateSession: |
| case CommandChoice::kCreateFdSession: |
| memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); |
| buf_offset += sizeof(session_id); |
| |
| { |
| uint32_t string_length = static_cast<uint32_t>(file_path->size()); |
| memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); |
| buf_offset += sizeof(string_length); |
| } |
| |
| DCHECK(file_path.has_value()); |
| |
| memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); |
| buf_offset += file_path->size(); |
| |
| DCHECK_EQ(buf_offset, space_requirement) << *this << ",file_path_size:" << file_path->size(); |
| DCHECK_EQ(buf_offset, *produced_bytes) << *this; |
| |
| break; |
| case CommandChoice::kDestroySession: |
| case CommandChoice::kDumpSession: |
| memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); |
| buf_offset += sizeof(session_id); |
| break; |
| case CommandChoice::kExit: |
| case CommandChoice::kDumpEverything: |
| // Only need to write out the choice. |
| break; |
| default: |
| LOG(FATAL) << "should have fallen out in the above switch" |
| << static_cast<uint32_t>(choice); |
| break; |
| } |
| |
| DCHECK_EQ(buf_offset, space_requirement) << *this; |
| DCHECK_EQ(buf_offset, *produced_bytes) << *this; |
| |
| return true; |
| } |
| |
| class PrefetcherDaemon::Impl { |
| public: |
| std::optional<PrefetcherForkParameters> StartPipesViaFork() { |
| int pipefds[2]; |
| if (pipe(&pipefds[0]) != 0) { |
| PLOG(FATAL) << "Failed to create read/write pipes"; |
| } |
| |
| if (WOULD_LOG(VERBOSE)) { |
| long pipe_size = static_cast<long>(fcntl(pipefds[0], F_GETPIPE_SZ)); |
| if (pipe_size < 0) { |
| PLOG(ERROR) << "Failed to F_GETPIPE_SZ:"; |
| } |
| LOG(VERBOSE) << "StartPipesViaFork: default pipe size: " << pipe_size; |
| } |
| |
| for (int i = 0; i < 2; ++i) { |
| // Default pipe size is usually 64KB. |
| // Increase to 1MB so that iorapd has to rarely run during prefetching. |
| if (fcntl(pipefds[i], F_SETPIPE_SZ, kPipeBufferSize) < 0) { |
| PLOG(FATAL) << "Failed to increase pipe size to max"; |
| } |
| } |
| |
| pipefd_read_ = pipefds[0]; |
| pipefd_write_ = pipefds[1]; |
| |
| PrefetcherForkParameters params; |
| params.input_fd = pipefd_read_; |
| params.output_fd = pipefd_write_; |
| params.format_text = false; |
| params.use_sockets = false; |
| |
| bool res = StartViaFork(params); |
| if (res) { |
| return params; |
| } else { |
| return std::nullopt; |
| } |
| } |
| |
| std::optional<PrefetcherForkParameters> StartSocketViaFork() { |
| int socket_fds[2]; |
| if (socketpair(AF_UNIX, SOCK_STREAM, /*protocol*/0, &socket_fds[0]) != 0) { |
| PLOG(FATAL) << "Failed to create read/write socketpair"; |
| } |
| |
| pipefd_read_ = socket_fds[0]; // iorapd writer, iorap.prefetcherd reader |
| pipefd_write_ = socket_fds[1]; // iorapd reader, iorap.prefetcherd writer |
| |
| PrefetcherForkParameters params; |
| params.input_fd = pipefd_read_; |
| params.output_fd = pipefd_write_; |
| params.format_text = false; |
| params.use_sockets = true; |
| |
| bool res = StartViaFork(params); |
| if (res) { |
| return params; |
| } else { |
| return std::nullopt; |
| } |
| } |
| |
| bool StartViaFork(PrefetcherForkParameters params) { |
| params_ = params; |
| |
| forked_ = true; |
| child_ = fork(); |
| |
| if (child_ == -1) { |
| LOG(FATAL) << "Failed to fork PrefetcherDaemon"; |
| } else if (child_ > 0) { // we are the caller of this function |
| LOG(DEBUG) << "forked into iorap.prefetcherd, pid = " << child_; |
| |
| return true; |
| } else { |
| // we are the child that was forked. |
| std::stringstream argv; // for logging |
| std::vector<std::string> argv_vec; |
| |
| { |
| std::stringstream s; |
| s << "--input-fd"; |
| argv_vec.push_back(s.str()); |
| |
| std::stringstream s2; |
| s2 << params.input_fd; |
| argv_vec.push_back(s2.str()); |
| |
| argv << " --input-fd" << " " << params.input_fd; |
| } |
| |
| { |
| std::stringstream s; |
| s << "--output-fd"; |
| argv_vec.push_back(s.str()); |
| |
| std::stringstream s2; |
| s2 << params.output_fd; |
| argv_vec.push_back(s2.str()); |
| |
| argv << " --output-fd" << " " << params.output_fd; |
| } |
| |
| |
| if (params.use_sockets) { |
| std::stringstream s; |
| s << "--use-sockets"; |
| argv_vec.push_back(s.str()); |
| |
| argv << " --use-sockets"; |
| } |
| |
| if (WOULD_LOG(VERBOSE)) { |
| std::stringstream s; |
| s << "--verbose"; |
| argv_vec.push_back(s.str()); |
| |
| argv << " --verbose"; |
| } |
| |
| std::unique_ptr<ArgString[]> argv_ptr = common::VecToArgv(kCommandFileName, argv_vec); |
| |
| LOG(DEBUG) << "fork+exec: " << kCommandFileName << " " |
| << argv.str(); |
| execve(kCommandFileName, (char **)argv_ptr.get(), /*envp*/nullptr); |
| // This should never return. |
| _exit(EXIT_FAILURE); |
| } |
| |
| DCHECK(false); |
| return false; |
| } |
| |
| // TODO: Not very useful since this can never return 'true' |
| // -> in the child we would've already execd which loses all this code. |
| bool IsDaemon() { |
| // In the child the pid is always 0. |
| return child_ > 0; |
| } |
| |
| bool Main(PrefetcherForkParameters params) { |
| LOG(VERBOSE) << "PrefetcherDaemon::Main " << params; |
| |
| CommandParser command_parser{std::move(params)}; |
| |
| Command next_command{}; |
| |
| std::vector<Command> many_commands; |
| |
| // Ensure alogd is pre-initialized before installing minijail. |
| LOG(DEBUG) << "Installing minijail"; |
| |
| // Install seccomp filter using libminijail. |
| if (kInstallMiniJail) { |
| MiniJail(); |
| } |
| |
| while (true) { |
| bool eof = false; |
| |
| if (params.use_sockets) { |
| // use recvmsg(2). supports receiving FDs. |
| many_commands = command_parser.ParseSocketCommands(/*out*/eof); |
| } else { |
| // use read(2). does not support receiving FDs. |
| many_commands = command_parser.ParseCommands(/*out*/eof); |
| } |
| |
| if (eof) { |
| LOG(WARNING) << "PrefetcherDaemon got EOF, terminating"; |
| return true; |
| } |
| |
| for (auto& command : many_commands) { |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "PrefetcherDaemon got command: " << command; |
| } |
| |
| if (command.choice == CommandChoice::kExit) { |
| LOG(DEBUG) << "PrefetcherDaemon got kExit command, terminating"; |
| return true; |
| } |
| |
| if (!ReceiveCommand(command)) { |
| // LOG(WARNING) << "PrefetcherDaemon command processing failure: " << command; |
| } |
| |
| // ReceiveCommand should dup to keep the FD. Avoid leaks. |
| if (command.fd.has_value()) { |
| close(*command.fd); |
| } |
| } |
| } |
| |
| LOG(VERBOSE) << "PrefetcherDaemon::Main got exit, terminating"; |
| |
| return true; |
| // Terminate. |
| } |
| |
| Impl(PrefetcherDaemon* daemon) { |
| session_manager_ = SessionManager::CreateManager(SessionKind::kInProcessDirect); |
| DCHECK(session_manager_ != nullptr); |
| }; |
| |
| ~Impl() { |
| // Don't do anything if we never called 'StartViaFork' |
| if (forked_) { |
| if (!IsDaemon()) { |
| int status; |
| waitpid(child_, /*out*/&status, /*options*/0); |
| } else { |
| LOG(WARNING) << "execve should have avoided this path"; |
| // DCHECK(false) << "not possible because the execve would avoid this path"; |
| } |
| } |
| } |
| |
| bool SendCommand(const Command& command) { |
| // Only parent is the sender. |
| DCHECK(forked_); |
| //DCHECK(!IsDaemon()); |
| |
| char buf[1024]; |
| size_t stream_size; |
| if (!command.Write(buf, sizeof(buf), /*out*/&stream_size)) { |
| PLOG(ERROR) << "Failed to serialize command: " << command; |
| return false; |
| } |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "pre-write(fd=" << pipefd_write_ << ", buf=" << buf |
| << ", size=" << stream_size<< ")"; |
| } |
| |
| if (params_.use_sockets) { |
| /* iov contains the normal message (Command) */ |
| struct iovec iov; |
| memset(&iov, 0, sizeof(iov)); |
| iov.iov_base = &buf[0]; |
| iov.iov_len = stream_size; |
| |
| struct msghdr msg; |
| memset(&msg, 0, sizeof(msg)); |
| |
| /* point to iov to transmit */ |
| msg.msg_iov = &iov; |
| msg.msg_iovlen = 1; |
| |
| /* no dest address; socket is connected */ |
| msg.msg_name = nullptr; |
| msg.msg_namelen = 0; |
| |
| // append a CMSG with SCM_RIGHTS if we have an FD. |
| if (command.fd.has_value()) { |
| union { |
| struct cmsghdr cmh; |
| char control[CMSG_SPACE(sizeof(int))]; /* sized to hold an fd (int) */ |
| } control_un; |
| memset(&control_un, 0, sizeof(control_un)); |
| |
| msg.msg_control = &control_un.control[0]; |
| msg.msg_controllen = sizeof(control_un.control); |
| |
| struct cmsghdr *hp; |
| hp = CMSG_FIRSTHDR(&msg); |
| hp->cmsg_len = CMSG_LEN(sizeof(int)); |
| hp->cmsg_level = SOL_SOCKET; |
| hp->cmsg_type = SCM_RIGHTS; |
| *((int *) CMSG_DATA(hp)) = *(command.fd); |
| |
| DCHECK(command.RequiresFd()) << command; |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "append FD to sendmsg: " << *(command.fd); |
| } |
| } |
| |
| // TODO: add CMSG for the FD passage. |
| |
| if (TEMP_FAILURE_RETRY(sendmsg(pipefd_write_, &msg, /*flags*/0)) < 0) { |
| PLOG(ERROR) << "Failed to sendmsg command: " << command; |
| return false; |
| } |
| } else { |
| if (TEMP_FAILURE_RETRY(write(pipefd_write_, buf, stream_size)) < 0) { |
| PLOG(ERROR) << "Failed to write command: " << command; |
| return false; |
| } |
| } |
| |
| if (LogVerboseIpc()) { |
| LOG(VERBOSE) << "write(fd=" << pipefd_write_ << ", buf=" << buf |
| << ", size=" << stream_size<< ")"; |
| } |
| |
| // TODO: also read the reply? |
| return true; |
| } |
| |
| bool ReceiveCommand(const Command& command) { |
| // Only child is the command receiver. |
| // DCHECK(IsDaemon()); |
| |
| switch (command.choice) { |
| case CommandChoice::kRegisterFilePath: { |
| std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); |
| |
| if (!session) { |
| LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; |
| return false; |
| } |
| |
| CHECK(command.file_path.has_value()) << command; |
| return session->RegisterFilePath(command.id, *command.file_path); |
| } |
| case CommandChoice::kUnregisterFilePath: { |
| std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); |
| |
| if (!session) { |
| LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; |
| return false; |
| } |
| |
| return session->UnregisterFilePath(command.id); |
| } |
| case CommandChoice::kReadAhead: { |
| std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); |
| |
| if (!session) { |
| LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; |
| return false; |
| } |
| |
| return session->ReadAhead(command.id, command.read_ahead_kind, command.length, command.offset); |
| } |
| // TODO: unreadahead |
| case CommandChoice::kExit: { |
| LOG(WARNING) << "kExit should be handled earlier."; |
| return true; |
| } |
| case CommandChoice::kCreateSession: { |
| std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); |
| if (session != nullptr) { |
| LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; |
| return false; |
| } |
| CHECK(command.file_path.has_value()) << command; |
| if (session_manager_->CreateSession(command.session_id, /*description*/*command.file_path) |
| == nullptr) { |
| LOG(ERROR) << "ReceiveCommand: Failure to kCreateSession: " << command; |
| return false; |
| } |
| return true; |
| } |
| case CommandChoice::kDestroySession: { |
| if (!session_manager_->DestroySession(command.session_id)) { |
| LOG(ERROR) << "ReceiveCommand: Failure to kDestroySession: " << command; |
| return false; |
| } |
| return true; |
| } |
| case CommandChoice::kDumpSession: { |
| std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); |
| |
| if (!session) { |
| LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; |
| return false; |
| } |
| |
| // TODO: Consider doing dumpsys support somehow? |
| session->Dump(LOG_STREAM(DEBUG), /*multiline*/true); |
| return true; |
| } |
| case CommandChoice::kDumpEverything: { |
| session_manager_->Dump(LOG_STREAM(DEBUG), /*multiline*/true); |
| break; |
| } |
| case CommandChoice::kCreateFdSession: { |
| std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); |
| if (session != nullptr) { |
| LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; |
| return false; |
| } |
| CHECK(command.file_path.has_value()) << command; |
| CHECK(command.fd.has_value()) << command; |
| |
| LOG(VERBOSE) << "ReceiveCommand: kCreateFdSession fd=" << *(command.fd); |
| |
| // TODO: Maybe use CreateFdSession instead? |
| session = |
| session_manager_->CreateSession(command.session_id, |
| /*description*/*command.file_path, |
| command.fd.value()); |
| if (session == nullptr) { |
| LOG(ERROR) << "ReceiveCommand: Failure to kCreateFdSession: " << command; |
| return false; |
| } |
| |
| return session->ProcessFd(*command.fd); |
| } |
| } |
| |
| return true; |
| } |
| |
| pid_t child_; |
| bool forked_; |
| int pipefd_read_; |
| int pipefd_write_; |
| PrefetcherForkParameters params_; |
| // do not ever use an indirect session manager here, as it would cause a lifetime cycle. |
| std::unique_ptr<SessionManager> session_manager_; // direct only. |
| }; |
| |
| PrefetcherDaemon::PrefetcherDaemon() |
| : impl_{new Impl{this}} { |
| LOG(VERBOSE) << "PrefetcherDaemon() constructor"; |
| } |
| |
| bool PrefetcherDaemon::StartViaFork(PrefetcherForkParameters params) { |
| return impl_->StartViaFork(std::move(params)); |
| } |
| |
| |
| std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartPipesViaFork() { |
| return impl_->StartPipesViaFork(); |
| } |
| |
| std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartSocketViaFork() { |
| return impl_->StartSocketViaFork(); |
| } |
| |
| bool PrefetcherDaemon::Main(PrefetcherForkParameters params) { |
| return impl_->Main(params); |
| } |
| |
| bool PrefetcherDaemon::SendCommand(const Command& command) { |
| return impl_->SendCommand(command); |
| } |
| |
| PrefetcherDaemon::~PrefetcherDaemon() { |
| // required for unique_ptr for incomplete types. |
| } |
| |
| } // namespace iorap::prefetcher |