blob: 3f59ffa00af1e9caa45d5078fdb942d7ed035d2a [file] [log] [blame]
// Copyright 2021 The Chromium OS Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use std::{
io::{
Cursor, Read, Write, {self},
},
time::Duration,
};
use crate::descriptor::{AsRawDescriptor, FromRawDescriptor, SafeDescriptor};
use crate::{
platform::{deserialize_with_descriptors, RawDescriptor, SerializeDescriptors},
tube::{Error, RecvTube, Result, SendTube},
BlockingMode, CloseNotifier, FramingMode, PollToken, ReadNotifier, StreamChannel,
};
use data_model::DataInit;
use lazy_static::lazy_static;
use serde::{de::DeserializeOwned, Deserialize, Serialize, Serializer};
use std::{
mem,
os::windows::io::{AsRawHandle, RawHandle},
};
use winapi::shared::winerror::ERROR_MORE_DATA;
/// Bidirectional tube that support both send and recv.
///
/// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
/// (A, B). We wish to send B to another process, and communicate with it using A from the current
/// process:
/// 1. B's target_pid must be set to the current PID *before* serialization. There is a
/// serialization hook that sets it to the current PID automatically if target_pid is unset.
/// 2. A's target_pid must be set to the PID of the process where B was sent.
///
/// If instead you are sending both A and B to separate processes, then:
/// 1. A's target_pid must be set to B's pid, manually.
/// 2. B's target_pid must be set to A's pid, manually.
///
/// Automating all of this and getting a completely clean interface is tricky. We would need
/// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
/// state about PIDs between the ends. There are alternatives like reusing the underlying
/// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
/// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
#[derive(Serialize, Deserialize, Debug)]
pub struct Tube {
socket: StreamChannel,
// Default target_pid to current PID on serialization (see `Tube` comment header for details).
#[serde(serialize_with = "set_tube_pid_on_serialize")]
target_pid: Option<u32>,
}
/// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
/// default it to the current process, because the other end will be in the current process.
fn set_tube_pid_on_serialize<S>(
existing_pid_value: &Option<u32>,
serializer: S,
) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
match existing_pid_value {
Some(pid) => serializer.serialize_u32(*pid),
None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
}
}
#[derive(Copy, Clone, Debug)]
#[repr(C)]
struct MsgHeader {
msg_json_size: usize,
descriptor_json_size: usize,
}
// Safe because it only has data and has no implicit padding.
unsafe impl DataInit for MsgHeader {}
lazy_static! {
static ref DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
static ref ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
}
/// Set a tube to delegate duplicate handle calls.
pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
DH_TUBE.lock().replace(dh_tube);
}
/// Set alias pid for use with a DuplicateHandleTube.
pub fn set_alias_pid(alias_pid: u32) {
ALIAS_PID.lock().replace(alias_pid);
}
impl Tube {
/// Create a pair of connected tubes. Request is sent in one direction while response is
/// received in the other direction.
/// The result is in the form (server, client).
pub fn pair() -> Result<(Tube, Tube)> {
let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
Ok((Tube::new(socket1), Tube::new(socket2)))
}
/// Create a pair of connected tubes with the specified buffer size.
/// Request is sent in one direction while response is received in the other direction.
/// The result is in the form (server, client).
pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
BlockingMode::Blocking,
FramingMode::Message,
buffer_size,
)
.map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
let tube1 = Tube::new(socket1);
let tube2 = Tube::new(socket2);
Ok((tube1, tube2))
}
// Create a new `Tube`.
pub fn new(socket: StreamChannel) -> Tube {
Tube {
socket,
target_pid: None,
}
}
pub(super) fn try_clone(&self) -> Result<Self> {
Ok(Tube {
socket: self.socket.try_clone().map_err(Error::Clone)?,
target_pid: self.target_pid,
})
}
pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
}
pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
deserialize_and_recv(|buf| (&self.socket).read(buf))
}
/// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
/// documentation to ensure you have a server pipe before calling.
#[cfg(windows)]
pub fn flush_blocking(&mut self) -> Result<()> {
self.socket.flush_blocking().map_err(Error::Flush)
}
/// For Tubes that span processes, this method must be used to set the PID of the other end
/// of the Tube, otherwise sending handles to the other end won't work.
pub fn set_target_pid(&mut self, target_pid: u32) {
self.target_pid = Some(target_pid);
}
/// Returns the PID of the process at the other end of the Tube, if any is set.
pub fn target_pid(&self) -> Option<u32> {
self.target_pid
}
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
}
/// TODO(b/145998747, b/184398671): this method should be removed.
pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
unimplemented!("To be removed/refactored upstream.");
}
}
pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
write_fn: F,
msg: &T,
target_pid: Option<u32>,
) -> Result<()> {
let msg_serialize = SerializeDescriptors::new(&msg);
let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
let msg_descriptors = msg_serialize.into_descriptors();
let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
for desc in msg_descriptors {
// Safe because these handles are guaranteed to be valid. Details:
// 1. They come from sys_util::descriptor_reflection::with_as_descriptor.
// 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
// SafeDescriptor).
// 3. The owning object is borrowed by msg until sending is complete.
duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
}
let descriptor_json = if duped_descriptors.is_empty() {
None
} else {
Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
};
let header = MsgHeader {
msg_json_size: msg_json.len(),
descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
};
let mut data_packet = Cursor::new(Vec::with_capacity(
header.as_slice().len() + header.msg_json_size + header.descriptor_json_size,
));
data_packet
.write(header.as_slice())
.map_err(Error::SendIoBuf)?;
data_packet
.write(msg_json.as_slice())
.map_err(Error::SendIoBuf)?;
if let Some(descriptor_json) = descriptor_json {
data_packet
.write(descriptor_json.as_slice())
.map_err(Error::SendIoBuf)?;
}
// Multiple writers (producers) are safe because each write is atomic.
let data_bytes = data_packet.into_inner();
write_fn(&data_bytes).map_err(Error::SendIo)?;
Ok(())
}
fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
match target_pid {
Some(pid) => match &*DH_TUBE.lock() {
Some(tube) => tube.request_duplicate_handle(pid, desc),
None => {
win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
}
},
None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
}
}
/// Reads a part of a Tube packet asserting that it was correctly read. This means:
/// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer.
/// We use this to ignore errors when reading the message header, which has the lengths we need
/// to allocate our buffers for the remainder of the message.
/// * We filled the supplied buffer.
fn perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>(
read_fn: &F,
buf: &mut [u8],
) -> io::Result<usize> {
let res = match read_fn(buf) {
Ok(s) => Ok(s),
Err(e)
if e.raw_os_error()
.map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
{
Ok(buf.len())
}
Err(e) => Err(e),
};
let bytes_read = res?;
if bytes_read != buf.len() {
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
} else {
Ok(bytes_read)
}
}
/// Deserializes a Tube packet by calling the supplied read function. This function MUST
/// assert that the buffer was filled.
pub fn deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>(
read_fn: F,
) -> Result<T> {
let mut header_bytes = vec![0u8; mem::size_of::<MsgHeader>()];
perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?;
// Safe because the header is always written by the send function, and only that function
// writes to this channel.
let header =
MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize.");
let mut msg_json = vec![0u8; header.msg_json_size];
perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?;
if msg_json.is_empty() {
// This means we got a message header, but there is no json body (due to a zero size in
// the header). This should never happen because it means the receiver is getting no
// data whatsoever from the sender.
return Err(Error::RecvUnexpectedEmptyBody);
}
let msg_descriptors: Vec<RawDescriptor> = if header.descriptor_json_size > 0 {
let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?;
let descriptor_usizes: Vec<usize> =
serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?;
// Safe because the usizes are RawDescriptors that were converted to usize in the send
// method.
descriptor_usizes
.iter()
.map(|item| *item as RawDescriptor)
.collect()
} else {
Vec::new()
};
let mut msg_descriptors_safe = msg_descriptors
.into_iter()
.map(|v| {
Some(unsafe {
// Safe because the socket returns new fds that are owned locally by this scope.
SafeDescriptor::from_raw_descriptor(v)
})
})
.collect();
deserialize_with_descriptors(
|| serde_json::from_slice(&msg_json),
&mut msg_descriptors_safe,
)
.map_err(Error::Json)
}
#[derive(PollToken, Eq, PartialEq, Copy, Clone)]
enum Token {
SocketReady,
}
impl AsRawDescriptor for Tube {
fn as_raw_descriptor(&self) -> RawDescriptor {
self.socket.as_raw_descriptor()
}
}
impl AsRawHandle for Tube {
fn as_raw_handle(&self) -> RawHandle {
self.as_raw_descriptor()
}
}
impl ReadNotifier for Tube {
fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
self.socket.get_read_notifier()
}
}
impl CloseNotifier for Tube {
fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
self.socket.get_close_notifier()
}
}
impl AsRawHandle for SendTube {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_descriptor()
}
}
impl AsRawHandle for RecvTube {
fn as_raw_handle(&self) -> RawHandle {
self.0.as_raw_descriptor()
}
}
/// A request to duplicate a handle to a target process.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleRequest {
pub target_alias_pid: u32,
pub handle: usize,
}
/// Contains a duplicated handle or None if an error occurred.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleResponse {
pub handle: Option<usize>,
}
/// Wrapper for tube which is used to delegate DuplicateHandle function calls to
/// the broker process.
#[derive(Serialize, Deserialize, Debug)]
pub struct DuplicateHandleTube(Tube);
impl DuplicateHandleTube {
pub fn new(tube: Tube) -> Self {
Self(tube)
}
pub fn request_duplicate_handle(
&self,
target_alias_pid: u32,
handle: RawHandle,
) -> Result<RawHandle> {
let req = DuplicateHandleRequest {
target_alias_pid,
handle: handle as usize,
};
self.0.send(&req)?;
let res: DuplicateHandleResponse = self.0.recv()?;
res.handle
.map(|h| h as RawHandle)
.ok_or(Error::BrokerDupDescriptor)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{EventContext, EventTrigger, PollToken, ReadNotifier};
use std::time;
const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10);
#[derive(PollToken, Debug, Eq, PartialEq, Copy, Clone)]
enum Token {
ReceivedData,
}
#[test]
fn test_serialize_tube() {
let (tube_1, tube_2) = Tube::pair().unwrap();
let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
tube_2.get_read_notifier(),
Token::ReceivedData,
)])
.unwrap();
// Serialize the Tube
let msg_serialize = SerializeDescriptors::new(&tube_1);
let serialized = serde_json::to_vec(&msg_serialize).unwrap();
let msg_descriptors = msg_serialize.into_descriptors();
// Deserialize the Tube
let mut msg_descriptors_safe = msg_descriptors
.into_iter()
.map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) }))
.collect();
let tube_deserialized: Tube = deserialize_with_descriptors(
|| serde_json::from_slice(&serialized),
&mut msg_descriptors_safe,
)
.unwrap();
// Send a message through deserialized Tube
tube_deserialized.send(&"hi".to_string()).unwrap();
assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
assert_eq!(tube_2.recv::<String>().unwrap(), "hi");
}
}