Noah Gold | c286772 | 2022-03-18 16:04:25 -0700 | [diff] [blame] | 1 | // Copyright 2021 The Chromium OS Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style license that can be |
| 3 | // found in the LICENSE file. |
| 4 | |
Dennis Kempin | 9fbf498 | 2022-03-21 14:44:36 -0700 | [diff] [blame] | 5 | use std::{ |
| 6 | io::{ |
| 7 | Cursor, Read, Write, {self}, |
| 8 | }, |
| 9 | time::Duration, |
| 10 | }; |
Noah Gold | c286772 | 2022-03-18 16:04:25 -0700 | [diff] [blame] | 11 | |
Richard | 8ec0b3d | 2022-03-28 15:53:02 -0700 | [diff] [blame] | 12 | use crate::descriptor::{AsRawDescriptor, FromRawDescriptor, SafeDescriptor}; |
Noah Gold | c286772 | 2022-03-18 16:04:25 -0700 | [diff] [blame] | 13 | use crate::{ |
Richard | 8ec0b3d | 2022-03-28 15:53:02 -0700 | [diff] [blame] | 14 | platform::{deserialize_with_descriptors, RawDescriptor, SerializeDescriptors}, |
Dennis Kempin | 9fbf498 | 2022-03-21 14:44:36 -0700 | [diff] [blame] | 15 | tube::{Error, RecvTube, Result, SendTube}, |
Richard | 8ec0b3d | 2022-03-28 15:53:02 -0700 | [diff] [blame] | 16 | BlockingMode, CloseNotifier, FramingMode, PollToken, ReadNotifier, StreamChannel, |
Noah Gold | c286772 | 2022-03-18 16:04:25 -0700 | [diff] [blame] | 17 | }; |
Noah Gold | c286772 | 2022-03-18 16:04:25 -0700 | [diff] [blame] | 18 | use data_model::DataInit; |
| 19 | use lazy_static::lazy_static; |
| 20 | use serde::{de::DeserializeOwned, Deserialize, Serialize, Serializer}; |
Dennis Kempin | 9fbf498 | 2022-03-21 14:44:36 -0700 | [diff] [blame] | 21 | use std::{ |
| 22 | mem, |
| 23 | os::windows::io::{AsRawHandle, RawHandle}, |
Noah Gold | c286772 | 2022-03-18 16:04:25 -0700 | [diff] [blame] | 24 | }; |
| 25 | use winapi::shared::winerror::ERROR_MORE_DATA; |
| 26 | |
| 27 | /// Bidirectional tube that support both send and recv. |
| 28 | /// |
| 29 | /// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair |
| 30 | /// (A, B). We wish to send B to another process, and communicate with it using A from the current |
| 31 | /// process: |
| 32 | /// 1. B's target_pid must be set to the current PID *before* serialization. There is a |
| 33 | /// serialization hook that sets it to the current PID automatically if target_pid is unset. |
| 34 | /// 2. A's target_pid must be set to the PID of the process where B was sent. |
| 35 | /// |
| 36 | /// If instead you are sending both A and B to separate processes, then: |
| 37 | /// 1. A's target_pid must be set to B's pid, manually. |
| 38 | /// 2. B's target_pid must be set to A's pid, manually. |
| 39 | /// |
| 40 | /// Automating all of this and getting a completely clean interface is tricky. We would need |
| 41 | /// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync |
| 42 | /// state about PIDs between the ends. There are alternatives like reusing the underlying |
| 43 | /// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet |
| 44 | /// to find a compelling solution that isn't a mess to implement. Suggestions are welcome. |
| 45 | #[derive(Serialize, Deserialize, Debug)] |
| 46 | pub struct Tube { |
| 47 | socket: StreamChannel, |
| 48 | |
| 49 | // Default target_pid to current PID on serialization (see `Tube` comment header for details). |
| 50 | #[serde(serialize_with = "set_tube_pid_on_serialize")] |
| 51 | target_pid: Option<u32>, |
| 52 | } |
| 53 | |
| 54 | /// For a Tube which has not had its target_pid set, when it is serialized, we should automatically |
| 55 | /// default it to the current process, because the other end will be in the current process. |
| 56 | fn set_tube_pid_on_serialize<S>( |
| 57 | existing_pid_value: &Option<u32>, |
| 58 | serializer: S, |
| 59 | ) -> std::result::Result<S::Ok, S::Error> |
| 60 | where |
| 61 | S: Serializer, |
| 62 | { |
| 63 | match existing_pid_value { |
| 64 | Some(pid) => serializer.serialize_u32(*pid), |
| 65 | None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())), |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | #[derive(Copy, Clone, Debug)] |
| 70 | #[repr(C)] |
| 71 | struct MsgHeader { |
| 72 | msg_json_size: usize, |
| 73 | descriptor_json_size: usize, |
| 74 | } |
| 75 | |
| 76 | // Safe because it only has data and has no implicit padding. |
| 77 | unsafe impl DataInit for MsgHeader {} |
| 78 | |
| 79 | lazy_static! { |
| 80 | static ref DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None); |
| 81 | static ref ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None); |
| 82 | } |
| 83 | |
| 84 | /// Set a tube to delegate duplicate handle calls. |
| 85 | pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) { |
| 86 | DH_TUBE.lock().replace(dh_tube); |
| 87 | } |
| 88 | |
| 89 | /// Set alias pid for use with a DuplicateHandleTube. |
| 90 | pub fn set_alias_pid(alias_pid: u32) { |
| 91 | ALIAS_PID.lock().replace(alias_pid); |
| 92 | } |
| 93 | |
| 94 | impl Tube { |
| 95 | /// Create a pair of connected tubes. Request is sent in one direction while response is |
| 96 | /// received in the other direction. |
| 97 | /// The result is in the form (server, client). |
| 98 | pub fn pair() -> Result<(Tube, Tube)> { |
| 99 | let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message) |
| 100 | .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?; |
| 101 | |
| 102 | Ok((Tube::new(socket1), Tube::new(socket2))) |
| 103 | } |
| 104 | |
| 105 | /// Create a pair of connected tubes with the specified buffer size. |
| 106 | /// Request is sent in one direction while response is received in the other direction. |
| 107 | /// The result is in the form (server, client). |
| 108 | pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> { |
| 109 | let (socket1, socket2) = StreamChannel::pair_with_buffer_size( |
| 110 | BlockingMode::Blocking, |
| 111 | FramingMode::Message, |
| 112 | buffer_size, |
| 113 | ) |
| 114 | .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?; |
| 115 | let tube1 = Tube::new(socket1); |
| 116 | let tube2 = Tube::new(socket2); |
| 117 | Ok((tube1, tube2)) |
| 118 | } |
| 119 | |
| 120 | // Create a new `Tube`. |
| 121 | pub fn new(socket: StreamChannel) -> Tube { |
| 122 | Tube { |
| 123 | socket, |
| 124 | target_pid: None, |
| 125 | } |
| 126 | } |
| 127 | |
| 128 | pub(super) fn try_clone(&self) -> Result<Self> { |
| 129 | Ok(Tube { |
| 130 | socket: self.socket.try_clone().map_err(Error::Clone)?, |
| 131 | target_pid: self.target_pid, |
| 132 | }) |
| 133 | } |
| 134 | |
| 135 | pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> { |
| 136 | serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid) |
| 137 | } |
| 138 | |
| 139 | pub fn recv<T: DeserializeOwned>(&self) -> Result<T> { |
Vikram Auradkar | de1f006 | 2022-03-25 13:44:28 -0700 | [diff] [blame] | 140 | deserialize_and_recv(|buf| (&self.socket).read(buf)) |
Noah Gold | c286772 | 2022-03-18 16:04:25 -0700 | [diff] [blame] | 141 | } |
| 142 | |
| 143 | /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair |
| 144 | /// documentation to ensure you have a server pipe before calling. |
| 145 | #[cfg(windows)] |
| 146 | pub fn flush_blocking(&mut self) -> Result<()> { |
| 147 | self.socket.flush_blocking().map_err(Error::Flush) |
| 148 | } |
| 149 | |
| 150 | /// For Tubes that span processes, this method must be used to set the PID of the other end |
| 151 | /// of the Tube, otherwise sending handles to the other end won't work. |
| 152 | pub fn set_target_pid(&mut self, target_pid: u32) { |
| 153 | self.target_pid = Some(target_pid); |
| 154 | } |
| 155 | |
| 156 | /// Returns the PID of the process at the other end of the Tube, if any is set. |
| 157 | pub fn target_pid(&self) -> Option<u32> { |
| 158 | self.target_pid |
| 159 | } |
| 160 | |
| 161 | /// TODO(b/145998747, b/184398671): this method should be removed. |
| 162 | pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> { |
| 163 | unimplemented!("To be removed/refactored upstream."); |
| 164 | } |
| 165 | |
| 166 | /// TODO(b/145998747, b/184398671): this method should be removed. |
| 167 | pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> { |
| 168 | unimplemented!("To be removed/refactored upstream."); |
| 169 | } |
| 170 | } |
| 171 | |
| 172 | pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>( |
| 173 | write_fn: F, |
| 174 | msg: &T, |
| 175 | target_pid: Option<u32>, |
| 176 | ) -> Result<()> { |
| 177 | let msg_serialize = SerializeDescriptors::new(&msg); |
| 178 | let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?; |
| 179 | let msg_descriptors = msg_serialize.into_descriptors(); |
| 180 | |
| 181 | let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len()); |
| 182 | for desc in msg_descriptors { |
| 183 | // Safe because these handles are guaranteed to be valid. Details: |
| 184 | // 1. They come from sys_util::descriptor_reflection::with_as_descriptor. |
| 185 | // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File, |
| 186 | // SafeDescriptor). |
| 187 | // 3. The owning object is borrowed by msg until sending is complete. |
| 188 | duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize) |
| 189 | } |
| 190 | |
| 191 | let descriptor_json = if duped_descriptors.is_empty() { |
| 192 | None |
| 193 | } else { |
| 194 | Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?) |
| 195 | }; |
| 196 | |
| 197 | let header = MsgHeader { |
| 198 | msg_json_size: msg_json.len(), |
| 199 | descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()), |
| 200 | }; |
| 201 | |
| 202 | let mut data_packet = Cursor::new(Vec::with_capacity( |
| 203 | header.as_slice().len() + header.msg_json_size + header.descriptor_json_size, |
| 204 | )); |
| 205 | data_packet |
| 206 | .write(header.as_slice()) |
| 207 | .map_err(Error::SendIoBuf)?; |
| 208 | data_packet |
| 209 | .write(msg_json.as_slice()) |
| 210 | .map_err(Error::SendIoBuf)?; |
| 211 | if let Some(descriptor_json) = descriptor_json { |
| 212 | data_packet |
| 213 | .write(descriptor_json.as_slice()) |
| 214 | .map_err(Error::SendIoBuf)?; |
| 215 | } |
| 216 | |
| 217 | // Multiple writers (producers) are safe because each write is atomic. |
| 218 | let data_bytes = data_packet.into_inner(); |
| 219 | |
| 220 | write_fn(&data_bytes).map_err(Error::SendIo)?; |
| 221 | Ok(()) |
| 222 | } |
| 223 | |
| 224 | fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> { |
| 225 | match target_pid { |
| 226 | Some(pid) => match &*DH_TUBE.lock() { |
| 227 | Some(tube) => tube.request_duplicate_handle(pid, desc), |
| 228 | None => { |
| 229 | win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor) |
| 230 | } |
| 231 | }, |
| 232 | None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor), |
| 233 | } |
| 234 | } |
| 235 | |
| 236 | /// Reads a part of a Tube packet asserting that it was correctly read. This means: |
| 237 | /// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer. |
| 238 | /// We use this to ignore errors when reading the message header, which has the lengths we need |
| 239 | /// to allocate our buffers for the remainder of the message. |
| 240 | /// * We filled the supplied buffer. |
| 241 | fn perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>( |
| 242 | read_fn: &F, |
| 243 | buf: &mut [u8], |
| 244 | ) -> io::Result<usize> { |
| 245 | let res = match read_fn(buf) { |
| 246 | Ok(s) => Ok(s), |
| 247 | Err(e) |
| 248 | if e.raw_os_error() |
| 249 | .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) => |
| 250 | { |
| 251 | Ok(buf.len()) |
| 252 | } |
| 253 | Err(e) => Err(e), |
| 254 | }; |
| 255 | |
| 256 | let bytes_read = res?; |
| 257 | if bytes_read != buf.len() { |
| 258 | Err(io::Error::new( |
| 259 | io::ErrorKind::UnexpectedEof, |
| 260 | "failed to fill whole buffer", |
| 261 | )) |
| 262 | } else { |
| 263 | Ok(bytes_read) |
| 264 | } |
| 265 | } |
| 266 | |
| 267 | /// Deserializes a Tube packet by calling the supplied read function. This function MUST |
| 268 | /// assert that the buffer was filled. |
| 269 | pub fn deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>( |
| 270 | read_fn: F, |
| 271 | ) -> Result<T> { |
| 272 | let mut header_bytes = vec![0u8; mem::size_of::<MsgHeader>()]; |
| 273 | perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?; |
| 274 | |
| 275 | // Safe because the header is always written by the send function, and only that function |
| 276 | // writes to this channel. |
| 277 | let header = |
| 278 | MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize."); |
| 279 | |
| 280 | let mut msg_json = vec![0u8; header.msg_json_size]; |
| 281 | perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?; |
| 282 | |
| 283 | if msg_json.is_empty() { |
| 284 | // This means we got a message header, but there is no json body (due to a zero size in |
| 285 | // the header). This should never happen because it means the receiver is getting no |
| 286 | // data whatsoever from the sender. |
| 287 | return Err(Error::RecvUnexpectedEmptyBody); |
| 288 | } |
| 289 | |
| 290 | let msg_descriptors: Vec<RawDescriptor> = if header.descriptor_json_size > 0 { |
| 291 | let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size]; |
| 292 | perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?; |
| 293 | let descriptor_usizes: Vec<usize> = |
| 294 | serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?; |
| 295 | |
| 296 | // Safe because the usizes are RawDescriptors that were converted to usize in the send |
| 297 | // method. |
| 298 | descriptor_usizes |
| 299 | .iter() |
| 300 | .map(|item| *item as RawDescriptor) |
| 301 | .collect() |
| 302 | } else { |
| 303 | Vec::new() |
| 304 | }; |
| 305 | |
| 306 | let mut msg_descriptors_safe = msg_descriptors |
| 307 | .into_iter() |
| 308 | .map(|v| { |
| 309 | Some(unsafe { |
| 310 | // Safe because the socket returns new fds that are owned locally by this scope. |
| 311 | SafeDescriptor::from_raw_descriptor(v) |
| 312 | }) |
| 313 | }) |
| 314 | .collect(); |
| 315 | |
| 316 | deserialize_with_descriptors( |
| 317 | || serde_json::from_slice(&msg_json), |
| 318 | &mut msg_descriptors_safe, |
| 319 | ) |
| 320 | .map_err(Error::Json) |
| 321 | } |
| 322 | |
| 323 | #[derive(PollToken, Eq, PartialEq, Copy, Clone)] |
| 324 | enum Token { |
| 325 | SocketReady, |
| 326 | } |
| 327 | |
| 328 | impl AsRawDescriptor for Tube { |
| 329 | fn as_raw_descriptor(&self) -> RawDescriptor { |
| 330 | self.socket.as_raw_descriptor() |
| 331 | } |
| 332 | } |
| 333 | |
| 334 | impl AsRawHandle for Tube { |
| 335 | fn as_raw_handle(&self) -> RawHandle { |
| 336 | self.as_raw_descriptor() |
| 337 | } |
| 338 | } |
| 339 | |
| 340 | impl ReadNotifier for Tube { |
| 341 | fn get_read_notifier(&self) -> &dyn AsRawDescriptor { |
| 342 | self.socket.get_read_notifier() |
| 343 | } |
| 344 | } |
| 345 | |
| 346 | impl CloseNotifier for Tube { |
| 347 | fn get_close_notifier(&self) -> &dyn AsRawDescriptor { |
| 348 | self.socket.get_close_notifier() |
| 349 | } |
| 350 | } |
| 351 | |
| 352 | impl AsRawHandle for SendTube { |
| 353 | fn as_raw_handle(&self) -> RawHandle { |
| 354 | self.0.as_raw_descriptor() |
| 355 | } |
| 356 | } |
| 357 | |
| 358 | impl AsRawHandle for RecvTube { |
| 359 | fn as_raw_handle(&self) -> RawHandle { |
| 360 | self.0.as_raw_descriptor() |
| 361 | } |
| 362 | } |
| 363 | |
| 364 | /// A request to duplicate a handle to a target process. |
| 365 | #[derive(Serialize, Deserialize, Debug)] |
| 366 | pub struct DuplicateHandleRequest { |
| 367 | pub target_alias_pid: u32, |
| 368 | pub handle: usize, |
| 369 | } |
| 370 | |
| 371 | /// Contains a duplicated handle or None if an error occurred. |
| 372 | #[derive(Serialize, Deserialize, Debug)] |
| 373 | pub struct DuplicateHandleResponse { |
| 374 | pub handle: Option<usize>, |
| 375 | } |
| 376 | |
| 377 | /// Wrapper for tube which is used to delegate DuplicateHandle function calls to |
| 378 | /// the broker process. |
| 379 | #[derive(Serialize, Deserialize, Debug)] |
| 380 | pub struct DuplicateHandleTube(Tube); |
| 381 | |
| 382 | impl DuplicateHandleTube { |
| 383 | pub fn new(tube: Tube) -> Self { |
| 384 | Self(tube) |
| 385 | } |
| 386 | |
| 387 | pub fn request_duplicate_handle( |
| 388 | &self, |
| 389 | target_alias_pid: u32, |
| 390 | handle: RawHandle, |
| 391 | ) -> Result<RawHandle> { |
| 392 | let req = DuplicateHandleRequest { |
| 393 | target_alias_pid, |
| 394 | handle: handle as usize, |
| 395 | }; |
| 396 | self.0.send(&req)?; |
| 397 | let res: DuplicateHandleResponse = self.0.recv()?; |
| 398 | res.handle |
| 399 | .map(|h| h as RawHandle) |
| 400 | .ok_or(Error::BrokerDupDescriptor) |
| 401 | } |
| 402 | } |
| 403 | |
| 404 | #[cfg(test)] |
| 405 | mod tests { |
| 406 | use super::*; |
| 407 | use crate::{EventContext, EventTrigger, PollToken, ReadNotifier}; |
| 408 | use std::time; |
| 409 | |
| 410 | const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10); |
| 411 | |
| 412 | #[derive(PollToken, Debug, Eq, PartialEq, Copy, Clone)] |
| 413 | enum Token { |
| 414 | ReceivedData, |
| 415 | } |
| 416 | |
| 417 | #[test] |
| 418 | fn test_serialize_tube() { |
| 419 | let (tube_1, tube_2) = Tube::pair().unwrap(); |
| 420 | let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from( |
| 421 | tube_2.get_read_notifier(), |
| 422 | Token::ReceivedData, |
| 423 | )]) |
| 424 | .unwrap(); |
| 425 | |
| 426 | // Serialize the Tube |
| 427 | let msg_serialize = SerializeDescriptors::new(&tube_1); |
| 428 | let serialized = serde_json::to_vec(&msg_serialize).unwrap(); |
| 429 | let msg_descriptors = msg_serialize.into_descriptors(); |
| 430 | |
| 431 | // Deserialize the Tube |
| 432 | let mut msg_descriptors_safe = msg_descriptors |
| 433 | .into_iter() |
| 434 | .map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) })) |
| 435 | .collect(); |
| 436 | let tube_deserialized: Tube = deserialize_with_descriptors( |
| 437 | || serde_json::from_slice(&serialized), |
| 438 | &mut msg_descriptors_safe, |
| 439 | ) |
| 440 | .unwrap(); |
| 441 | |
| 442 | // Send a message through deserialized Tube |
| 443 | tube_deserialized.send(&"hi".to_string()).unwrap(); |
| 444 | |
| 445 | assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1); |
| 446 | assert_eq!(tube_2.recv::<String>().unwrap(), "hi"); |
| 447 | } |
| 448 | } |