crosvm: use seqpacket rather than datagram sockets
The advantage of seqpacket is that they are connection oriented. A
listener can be created that accepts new connections, useful for the
path based VM control sockets. Previously, the only bidirectional
sockets in crosvm were either stream based or made using socketpair.
This change also whitelists sendmsg and recvmsg for the common device
policy.
TEST=cargo test
BUG=chromium:848187
Change-Id: I83fd46f54bce105a7730632cd013b5e7047db22b
Reviewed-on: https://chromium-review.googlesource.com/1470917
Commit-Ready: Zach Reizner <zachr@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Tested-by: Zach Reizner <zachr@chromium.org>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
diff --git a/src/linux.rs b/src/linux.rs
index 20ef297..2fdda09 100644
--- a/src/linux.rs
+++ b/src/linux.rs
@@ -11,7 +11,7 @@
use std::io::{self, stdin, Read};
use std::mem;
use std::os::unix::io::{FromRawFd, RawFd};
-use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::str;
use std::sync::{Arc, Barrier};
@@ -27,13 +27,17 @@
use io_jail::{self, Minijail};
use kvm::*;
use libcras::CrasClient;
-use msg_socket::{MsgReceiver, MsgSender, MsgSocket, UnlinkMsgSocket};
+use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
use net_util::{Error as NetError, Tap};
use qcow::{self, ImageType, QcowFile};
use rand_ish::SimpleRng;
use sync::{Condvar, Mutex};
-use sys_util;
-use sys_util::*;
+use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener};
+use sys_util::{
+ self, block_signal, clear_signal, flock, get_blocked_signals, get_group_id, get_user_id,
+ getegid, geteuid, register_signal_handler, validate_raw_fd, EventFd, FlockOperation,
+ GuestMemory, Killable, PollContext, PollToken, SignalFd, Terminal, TimerFd, SIGRTMIN,
+};
use vhost;
use vm_control::{VmRequest, VmResponse, VmRunMode};
@@ -223,9 +227,9 @@
cfg: Config,
mem: &GuestMemory,
_exit_evt: &EventFd,
- wayland_device_socket: UnixDatagram,
- balloon_device_socket: UnixDatagram,
- disk_device_sockets: &mut Vec<UnixDatagram>,
+ wayland_device_socket: UnixSeqpacket,
+ balloon_device_socket: UnixSeqpacket,
+ disk_device_sockets: &mut Vec<UnixSeqpacket>,
) -> std::result::Result<Vec<(Box<PciDevice + 'static>, Option<Minijail>)>, Box<error::Error>> {
let default_pivot_root: &str = option_env!("DEFAULT_PIVOT_ROOT").unwrap_or("/var/empty");
@@ -998,22 +1002,20 @@
wayland_dmabuf: cfg.wayland_dmabuf,
};
- let mut control_sockets = Vec::new();
- if let Some(ref path_string) = cfg.socket_path {
- let path = Path::new(path_string);
- let dgram = UnixDatagram::bind(path).map_err(Error::CreateSocket)?;
- control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
- UnlinkUnixDatagram(dgram),
- ));
+ let control_server_socket = match &cfg.socket_path {
+ Some(path) => Some(UnlinkUnixSeqpacketListener(
+ UnixSeqpacketListener::bind(path).map_err(Error::CreateSocket)?,
+ )),
+ None => None,
};
+
+ let mut control_sockets = Vec::new();
let (wayland_host_socket, wayland_device_socket) =
- UnixDatagram::pair().map_err(Error::CreateSocket)?;
- control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
- UnlinkUnixDatagram(wayland_host_socket),
- ));
+ UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
+ control_sockets.push(MsgSocket::<VmResponse, VmRequest>::new(wayland_host_socket));
// Balloon gets a special socket so balloon requests can be forwarded from the main process.
let (balloon_host_socket, balloon_device_socket) =
- UnixDatagram::pair().map_err(Error::CreateSocket)?;
+ UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
// Create one control socket per disk.
let mut disk_device_sockets = Vec::new();
@@ -1021,7 +1023,7 @@
let disk_count = cfg.disks.len();
for _ in 0..disk_count {
let (disk_host_socket, disk_device_socket) =
- UnixDatagram::pair().map_err(Error::CreateSocket)?;
+ UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
disk_device_sockets.push(disk_device_socket);
let disk_host_socket = MsgSocket::<VmRequest, VmResponse>::new(disk_host_socket);
disk_host_sockets.push(disk_host_socket);
@@ -1040,6 +1042,7 @@
.map_err(Error::BuildingVm)?;
run_control(
linux,
+ control_server_socket,
control_sockets,
balloon_host_socket,
&disk_host_sockets,
@@ -1049,8 +1052,9 @@
fn run_control(
mut linux: RunnableLinuxVm,
- control_sockets: Vec<UnlinkMsgSocket<VmResponse, VmRequest>>,
- balloon_host_socket: UnixDatagram,
+ control_server_socket: Option<UnlinkUnixSeqpacketListener>,
+ mut control_sockets: Vec<MsgSocket<VmResponse, VmRequest>>,
+ balloon_host_socket: UnixSeqpacket,
disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>],
sigchld_fd: SignalFd,
) -> Result<()> {
@@ -1082,6 +1086,7 @@
CheckAvailableMemory,
LowMemory,
LowmemTimer,
+ VmControlServer,
VmControl { index: usize },
}
@@ -1101,6 +1106,12 @@
poll_ctx
.add(&sigchld_fd, Token::ChildSignal)
.map_err(Error::PollContextAdd)?;
+
+ if let Some(socket_server) = &control_server_socket {
+ poll_ctx
+ .add(socket_server, Token::VmControlServer)
+ .map_err(Error::PollContextAdd)?;
+ }
for (index, socket) in control_sockets.iter().enumerate() {
poll_ctx
.add(socket.as_ref(), Token::VmControl { index })
@@ -1167,6 +1178,8 @@
}
}
};
+
+ let mut vm_control_indices_to_remove = Vec::new();
for event in events.iter_readable() {
match event.token() {
Token::Exit => {
@@ -1286,6 +1299,24 @@
.map_err(Error::PollContextAdd)?;
}
}
+ Token::VmControlServer => {
+ if let Some(socket_server) = &control_server_socket {
+ match socket_server.accept() {
+ Ok(socket) => {
+ poll_ctx
+ .add(
+ &socket,
+ Token::VmControl {
+ index: control_sockets.len(),
+ },
+ )
+ .map_err(Error::PollContextAdd)?;
+ control_sockets.push(MsgSocket::new(socket));
+ }
+ Err(e) => error!("failed to accept socket: {}", e),
+ }
+ }
+ }
Token::VmControl { index } => {
if let Some(socket) = control_sockets.get(index) {
match socket.recv() {
@@ -1316,34 +1347,55 @@
}
}
}
- Err(e) => error!("failed to recv VmRequest: {}", e),
+ Err(e) => {
+ if let MsgError::BadRecvSize { actual: 0, .. } = e {
+ vm_control_indices_to_remove.push(index);
+ } else {
+ error!("failed to recv VmRequest: {}", e);
+ }
+ }
}
}
}
}
}
+
for event in events.iter_hungup() {
- // It's possible more data is readable and buffered while the socket is hungup, so
- // don't delete the socket from the poll context until we're sure all the data is
- // read.
- if !event.readable() {
- match event.token() {
- Token::Exit => {}
- Token::Stdin => {
- let _ = poll_ctx.delete(&stdin_handle);
- }
- Token::ChildSignal => {}
- Token::CheckAvailableMemory => {}
- Token::LowMemory => {}
- Token::LowmemTimer => {}
- Token::VmControl { index } => {
- if let Some(socket) = control_sockets.get(index) {
- let _ = poll_ctx.delete(socket.as_ref());
- }
+ match event.token() {
+ Token::Exit => {}
+ Token::Stdin => {
+ let _ = poll_ctx.delete(&stdin_handle);
+ }
+ Token::ChildSignal => {}
+ Token::CheckAvailableMemory => {}
+ Token::LowMemory => {}
+ Token::LowmemTimer => {}
+ Token::VmControlServer => {}
+ Token::VmControl { index } => {
+ // It's possible more data is readable and buffered while the socket is hungup,
+ // so don't delete the socket from the poll context until we're sure all the
+ // data is read.
+ match control_sockets.get(index).map(|s| s.get_readable_bytes()) {
+ Some(Ok(0)) | Some(Err(_)) => vm_control_indices_to_remove.push(index),
+ Some(Ok(x)) => info!("control index {} has {} bytes readable", index, x),
+ _ => {}
}
}
}
}
+
+ // Sort in reverse so the highest indexes are removed first. This removal algorithm
+ // preserved correct indexes as each element is removed.
+ vm_control_indices_to_remove.sort_unstable_by(|a, b| b.cmp(a));
+ vm_control_indices_to_remove.dedup();
+ for index in vm_control_indices_to_remove {
+ control_sockets.swap_remove(index);
+ if let Some(socket) = control_sockets.get(index) {
+ poll_ctx
+ .add(socket, Token::VmControl { index })
+ .map_err(Error::PollContextAdd)?;
+ }
+ }
}
// VCPU threads MUST see the VmRunMode flag, otherwise they may re-enter the VM.