blob: 3f59ffa00af1e9caa45d5078fdb942d7ed035d2a [file] [log] [blame]
Noah Goldc2867722022-03-18 16:04:25 -07001// Copyright 2021 The Chromium OS Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
Dennis Kempin9fbf4982022-03-21 14:44:36 -07005use std::{
6 io::{
7 Cursor, Read, Write, {self},
8 },
9 time::Duration,
10};
Noah Goldc2867722022-03-18 16:04:25 -070011
Richard8ec0b3d2022-03-28 15:53:02 -070012use crate::descriptor::{AsRawDescriptor, FromRawDescriptor, SafeDescriptor};
Noah Goldc2867722022-03-18 16:04:25 -070013use crate::{
Richard8ec0b3d2022-03-28 15:53:02 -070014 platform::{deserialize_with_descriptors, RawDescriptor, SerializeDescriptors},
Dennis Kempin9fbf4982022-03-21 14:44:36 -070015 tube::{Error, RecvTube, Result, SendTube},
Richard8ec0b3d2022-03-28 15:53:02 -070016 BlockingMode, CloseNotifier, FramingMode, PollToken, ReadNotifier, StreamChannel,
Noah Goldc2867722022-03-18 16:04:25 -070017};
Noah Goldc2867722022-03-18 16:04:25 -070018use data_model::DataInit;
19use lazy_static::lazy_static;
20use serde::{de::DeserializeOwned, Deserialize, Serialize, Serializer};
Dennis Kempin9fbf4982022-03-21 14:44:36 -070021use std::{
22 mem,
23 os::windows::io::{AsRawHandle, RawHandle},
Noah Goldc2867722022-03-18 16:04:25 -070024};
25use winapi::shared::winerror::ERROR_MORE_DATA;
26
27/// Bidirectional tube that support both send and recv.
28///
29/// NOTE: serializing this type across processes is slightly involved. Suppose there is a Tube pair
30/// (A, B). We wish to send B to another process, and communicate with it using A from the current
31/// process:
32/// 1. B's target_pid must be set to the current PID *before* serialization. There is a
33/// serialization hook that sets it to the current PID automatically if target_pid is unset.
34/// 2. A's target_pid must be set to the PID of the process where B was sent.
35///
36/// If instead you are sending both A and B to separate processes, then:
37/// 1. A's target_pid must be set to B's pid, manually.
38/// 2. B's target_pid must be set to A's pid, manually.
39///
40/// Automating all of this and getting a completely clean interface is tricky. We would need
41/// intercept the serialization of Tubes in any part of Serde messages, and use Weak refs to sync
42/// state about PIDs between the ends. There are alternatives like reusing the underlying
43/// StreamChannel to share PIDs, or having a separate pipe just for this purpose; however, we've yet
44/// to find a compelling solution that isn't a mess to implement. Suggestions are welcome.
45#[derive(Serialize, Deserialize, Debug)]
46pub struct Tube {
47 socket: StreamChannel,
48
49 // Default target_pid to current PID on serialization (see `Tube` comment header for details).
50 #[serde(serialize_with = "set_tube_pid_on_serialize")]
51 target_pid: Option<u32>,
52}
53
54/// For a Tube which has not had its target_pid set, when it is serialized, we should automatically
55/// default it to the current process, because the other end will be in the current process.
56fn set_tube_pid_on_serialize<S>(
57 existing_pid_value: &Option<u32>,
58 serializer: S,
59) -> std::result::Result<S::Ok, S::Error>
60where
61 S: Serializer,
62{
63 match existing_pid_value {
64 Some(pid) => serializer.serialize_u32(*pid),
65 None => serializer.serialize_u32(ALIAS_PID.lock().unwrap_or(std::process::id())),
66 }
67}
68
69#[derive(Copy, Clone, Debug)]
70#[repr(C)]
71struct MsgHeader {
72 msg_json_size: usize,
73 descriptor_json_size: usize,
74}
75
76// Safe because it only has data and has no implicit padding.
77unsafe impl DataInit for MsgHeader {}
78
79lazy_static! {
80 static ref DH_TUBE: sync::Mutex<Option<DuplicateHandleTube>> = sync::Mutex::new(None);
81 static ref ALIAS_PID: sync::Mutex<Option<u32>> = sync::Mutex::new(None);
82}
83
84/// Set a tube to delegate duplicate handle calls.
85pub fn set_duplicate_handle_tube(dh_tube: DuplicateHandleTube) {
86 DH_TUBE.lock().replace(dh_tube);
87}
88
89/// Set alias pid for use with a DuplicateHandleTube.
90pub fn set_alias_pid(alias_pid: u32) {
91 ALIAS_PID.lock().replace(alias_pid);
92}
93
94impl Tube {
95 /// Create a pair of connected tubes. Request is sent in one direction while response is
96 /// received in the other direction.
97 /// The result is in the form (server, client).
98 pub fn pair() -> Result<(Tube, Tube)> {
99 let (socket1, socket2) = StreamChannel::pair(BlockingMode::Blocking, FramingMode::Message)
100 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
101
102 Ok((Tube::new(socket1), Tube::new(socket2)))
103 }
104
105 /// Create a pair of connected tubes with the specified buffer size.
106 /// Request is sent in one direction while response is received in the other direction.
107 /// The result is in the form (server, client).
108 pub fn pair_with_buffer_size(buffer_size: usize) -> Result<(Tube, Tube)> {
109 let (socket1, socket2) = StreamChannel::pair_with_buffer_size(
110 BlockingMode::Blocking,
111 FramingMode::Message,
112 buffer_size,
113 )
114 .map_err(|e| Error::Pair(io::Error::from_raw_os_error(e.errno())))?;
115 let tube1 = Tube::new(socket1);
116 let tube2 = Tube::new(socket2);
117 Ok((tube1, tube2))
118 }
119
120 // Create a new `Tube`.
121 pub fn new(socket: StreamChannel) -> Tube {
122 Tube {
123 socket,
124 target_pid: None,
125 }
126 }
127
128 pub(super) fn try_clone(&self) -> Result<Self> {
129 Ok(Tube {
130 socket: self.socket.try_clone().map_err(Error::Clone)?,
131 target_pid: self.target_pid,
132 })
133 }
134
135 pub fn send<T: Serialize>(&self, msg: &T) -> Result<()> {
136 serialize_and_send(|buf| self.socket.write_immutable(buf), msg, self.target_pid)
137 }
138
139 pub fn recv<T: DeserializeOwned>(&self) -> Result<T> {
Vikram Auradkarde1f0062022-03-25 13:44:28 -0700140 deserialize_and_recv(|buf| (&self.socket).read(buf))
Noah Goldc2867722022-03-18 16:04:25 -0700141 }
142
143 /// NOTE: On Windows this will only succeed if called on a server pipe. See #pair
144 /// documentation to ensure you have a server pipe before calling.
145 #[cfg(windows)]
146 pub fn flush_blocking(&mut self) -> Result<()> {
147 self.socket.flush_blocking().map_err(Error::Flush)
148 }
149
150 /// For Tubes that span processes, this method must be used to set the PID of the other end
151 /// of the Tube, otherwise sending handles to the other end won't work.
152 pub fn set_target_pid(&mut self, target_pid: u32) {
153 self.target_pid = Some(target_pid);
154 }
155
156 /// Returns the PID of the process at the other end of the Tube, if any is set.
157 pub fn target_pid(&self) -> Option<u32> {
158 self.target_pid
159 }
160
161 /// TODO(b/145998747, b/184398671): this method should be removed.
162 pub fn set_send_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
163 unimplemented!("To be removed/refactored upstream.");
164 }
165
166 /// TODO(b/145998747, b/184398671): this method should be removed.
167 pub fn set_recv_timeout(&self, _timeout: Option<Duration>) -> Result<()> {
168 unimplemented!("To be removed/refactored upstream.");
169 }
170}
171
172pub fn serialize_and_send<T: Serialize, F: Fn(&[u8]) -> io::Result<usize>>(
173 write_fn: F,
174 msg: &T,
175 target_pid: Option<u32>,
176) -> Result<()> {
177 let msg_serialize = SerializeDescriptors::new(&msg);
178 let msg_json = serde_json::to_vec(&msg_serialize).map_err(Error::Json)?;
179 let msg_descriptors = msg_serialize.into_descriptors();
180
181 let mut duped_descriptors = Vec::with_capacity(msg_descriptors.len());
182 for desc in msg_descriptors {
183 // Safe because these handles are guaranteed to be valid. Details:
184 // 1. They come from sys_util::descriptor_reflection::with_as_descriptor.
185 // 2. with_as_descriptor is intended to be applied to owned descriptor types (e.g. File,
186 // SafeDescriptor).
187 // 3. The owning object is borrowed by msg until sending is complete.
188 duped_descriptors.push(duplicate_handle(desc, target_pid)? as usize)
189 }
190
191 let descriptor_json = if duped_descriptors.is_empty() {
192 None
193 } else {
194 Some(serde_json::to_vec(&duped_descriptors).map_err(Error::Json)?)
195 };
196
197 let header = MsgHeader {
198 msg_json_size: msg_json.len(),
199 descriptor_json_size: descriptor_json.as_ref().map_or(0, |json| json.len()),
200 };
201
202 let mut data_packet = Cursor::new(Vec::with_capacity(
203 header.as_slice().len() + header.msg_json_size + header.descriptor_json_size,
204 ));
205 data_packet
206 .write(header.as_slice())
207 .map_err(Error::SendIoBuf)?;
208 data_packet
209 .write(msg_json.as_slice())
210 .map_err(Error::SendIoBuf)?;
211 if let Some(descriptor_json) = descriptor_json {
212 data_packet
213 .write(descriptor_json.as_slice())
214 .map_err(Error::SendIoBuf)?;
215 }
216
217 // Multiple writers (producers) are safe because each write is atomic.
218 let data_bytes = data_packet.into_inner();
219
220 write_fn(&data_bytes).map_err(Error::SendIo)?;
221 Ok(())
222}
223
224fn duplicate_handle(desc: RawHandle, target_pid: Option<u32>) -> Result<RawHandle> {
225 match target_pid {
226 Some(pid) => match &*DH_TUBE.lock() {
227 Some(tube) => tube.request_duplicate_handle(pid, desc),
228 None => {
229 win_util::duplicate_handle_with_target_pid(desc, pid).map_err(Error::DupDescriptor)
230 }
231 },
232 None => win_util::duplicate_handle(desc).map_err(Error::DupDescriptor),
233 }
234}
235
236/// Reads a part of a Tube packet asserting that it was correctly read. This means:
237/// * Treats partial "message" (transport framing) reads are Ok, as long as we filled our buffer.
238/// We use this to ignore errors when reading the message header, which has the lengths we need
239/// to allocate our buffers for the remainder of the message.
240/// * We filled the supplied buffer.
241fn perform_read<F: Fn(&mut [u8]) -> io::Result<usize>>(
242 read_fn: &F,
243 buf: &mut [u8],
244) -> io::Result<usize> {
245 let res = match read_fn(buf) {
246 Ok(s) => Ok(s),
247 Err(e)
248 if e.raw_os_error()
249 .map_or(false, |errno| errno == ERROR_MORE_DATA as i32) =>
250 {
251 Ok(buf.len())
252 }
253 Err(e) => Err(e),
254 };
255
256 let bytes_read = res?;
257 if bytes_read != buf.len() {
258 Err(io::Error::new(
259 io::ErrorKind::UnexpectedEof,
260 "failed to fill whole buffer",
261 ))
262 } else {
263 Ok(bytes_read)
264 }
265}
266
267/// Deserializes a Tube packet by calling the supplied read function. This function MUST
268/// assert that the buffer was filled.
269pub fn deserialize_and_recv<T: DeserializeOwned, F: Fn(&mut [u8]) -> io::Result<usize>>(
270 read_fn: F,
271) -> Result<T> {
272 let mut header_bytes = vec![0u8; mem::size_of::<MsgHeader>()];
273 perform_read(&read_fn, header_bytes.as_mut_slice()).map_err(Error::Recv)?;
274
275 // Safe because the header is always written by the send function, and only that function
276 // writes to this channel.
277 let header =
278 MsgHeader::from_slice(header_bytes.as_slice()).expect("Tube header failed to deserialize.");
279
280 let mut msg_json = vec![0u8; header.msg_json_size];
281 perform_read(&read_fn, msg_json.as_mut_slice()).map_err(Error::Recv)?;
282
283 if msg_json.is_empty() {
284 // This means we got a message header, but there is no json body (due to a zero size in
285 // the header). This should never happen because it means the receiver is getting no
286 // data whatsoever from the sender.
287 return Err(Error::RecvUnexpectedEmptyBody);
288 }
289
290 let msg_descriptors: Vec<RawDescriptor> = if header.descriptor_json_size > 0 {
291 let mut msg_descriptors_json = vec![0u8; header.descriptor_json_size];
292 perform_read(&read_fn, msg_descriptors_json.as_mut_slice()).map_err(Error::Recv)?;
293 let descriptor_usizes: Vec<usize> =
294 serde_json::from_slice(msg_descriptors_json.as_slice()).map_err(Error::Json)?;
295
296 // Safe because the usizes are RawDescriptors that were converted to usize in the send
297 // method.
298 descriptor_usizes
299 .iter()
300 .map(|item| *item as RawDescriptor)
301 .collect()
302 } else {
303 Vec::new()
304 };
305
306 let mut msg_descriptors_safe = msg_descriptors
307 .into_iter()
308 .map(|v| {
309 Some(unsafe {
310 // Safe because the socket returns new fds that are owned locally by this scope.
311 SafeDescriptor::from_raw_descriptor(v)
312 })
313 })
314 .collect();
315
316 deserialize_with_descriptors(
317 || serde_json::from_slice(&msg_json),
318 &mut msg_descriptors_safe,
319 )
320 .map_err(Error::Json)
321}
322
323#[derive(PollToken, Eq, PartialEq, Copy, Clone)]
324enum Token {
325 SocketReady,
326}
327
328impl AsRawDescriptor for Tube {
329 fn as_raw_descriptor(&self) -> RawDescriptor {
330 self.socket.as_raw_descriptor()
331 }
332}
333
334impl AsRawHandle for Tube {
335 fn as_raw_handle(&self) -> RawHandle {
336 self.as_raw_descriptor()
337 }
338}
339
340impl ReadNotifier for Tube {
341 fn get_read_notifier(&self) -> &dyn AsRawDescriptor {
342 self.socket.get_read_notifier()
343 }
344}
345
346impl CloseNotifier for Tube {
347 fn get_close_notifier(&self) -> &dyn AsRawDescriptor {
348 self.socket.get_close_notifier()
349 }
350}
351
352impl AsRawHandle for SendTube {
353 fn as_raw_handle(&self) -> RawHandle {
354 self.0.as_raw_descriptor()
355 }
356}
357
358impl AsRawHandle for RecvTube {
359 fn as_raw_handle(&self) -> RawHandle {
360 self.0.as_raw_descriptor()
361 }
362}
363
364/// A request to duplicate a handle to a target process.
365#[derive(Serialize, Deserialize, Debug)]
366pub struct DuplicateHandleRequest {
367 pub target_alias_pid: u32,
368 pub handle: usize,
369}
370
371/// Contains a duplicated handle or None if an error occurred.
372#[derive(Serialize, Deserialize, Debug)]
373pub struct DuplicateHandleResponse {
374 pub handle: Option<usize>,
375}
376
377/// Wrapper for tube which is used to delegate DuplicateHandle function calls to
378/// the broker process.
379#[derive(Serialize, Deserialize, Debug)]
380pub struct DuplicateHandleTube(Tube);
381
382impl DuplicateHandleTube {
383 pub fn new(tube: Tube) -> Self {
384 Self(tube)
385 }
386
387 pub fn request_duplicate_handle(
388 &self,
389 target_alias_pid: u32,
390 handle: RawHandle,
391 ) -> Result<RawHandle> {
392 let req = DuplicateHandleRequest {
393 target_alias_pid,
394 handle: handle as usize,
395 };
396 self.0.send(&req)?;
397 let res: DuplicateHandleResponse = self.0.recv()?;
398 res.handle
399 .map(|h| h as RawHandle)
400 .ok_or(Error::BrokerDupDescriptor)
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use crate::{EventContext, EventTrigger, PollToken, ReadNotifier};
408 use std::time;
409
410 const EVENT_WAIT_TIME: time::Duration = time::Duration::from_secs(10);
411
412 #[derive(PollToken, Debug, Eq, PartialEq, Copy, Clone)]
413 enum Token {
414 ReceivedData,
415 }
416
417 #[test]
418 fn test_serialize_tube() {
419 let (tube_1, tube_2) = Tube::pair().unwrap();
420 let event_ctx: EventContext<Token> = EventContext::build_with(&[EventTrigger::from(
421 tube_2.get_read_notifier(),
422 Token::ReceivedData,
423 )])
424 .unwrap();
425
426 // Serialize the Tube
427 let msg_serialize = SerializeDescriptors::new(&tube_1);
428 let serialized = serde_json::to_vec(&msg_serialize).unwrap();
429 let msg_descriptors = msg_serialize.into_descriptors();
430
431 // Deserialize the Tube
432 let mut msg_descriptors_safe = msg_descriptors
433 .into_iter()
434 .map(|v| Some(unsafe { SafeDescriptor::from_raw_descriptor(v) }))
435 .collect();
436 let tube_deserialized: Tube = deserialize_with_descriptors(
437 || serde_json::from_slice(&serialized),
438 &mut msg_descriptors_safe,
439 )
440 .unwrap();
441
442 // Send a message through deserialized Tube
443 tube_deserialized.send(&"hi".to_string()).unwrap();
444
445 assert_eq!(event_ctx.wait_timeout(EVENT_WAIT_TIME).unwrap().len(), 1);
446 assert_eq!(tube_2.recv::<String>().unwrap(), "hi");
447 }
448}