crosvm: use seqpacket rather than datagram sockets
The advantage of seqpacket is that they are connection oriented. A
listener can be created that accepts new connections, useful for the
path based VM control sockets. Previously, the only bidirectional
sockets in crosvm were either stream based or made using socketpair.
This change also whitelists sendmsg and recvmsg for the common device
policy.
TEST=cargo test
BUG=chromium:848187
Change-Id: I83fd46f54bce105a7730632cd013b5e7047db22b
Reviewed-on: https://chromium-review.googlesource.com/1470917
Commit-Ready: Zach Reizner <zachr@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Tested-by: Zach Reizner <zachr@chromium.org>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
diff --git a/devices/src/proxy.rs b/devices/src/proxy.rs
index 4bc3405..a91d2ed 100644
--- a/devices/src/proxy.rs
+++ b/devices/src/proxy.rs
@@ -7,14 +7,14 @@
use libc::pid_t;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
use std::process;
use std::time::Duration;
use std::{self, fmt, io};
-use msg_socket::{MsgOnSocket, MsgReceiver, MsgSender, MsgSocket};
-
use io_jail::{self, Minijail};
+use msg_socket::{MsgOnSocket, MsgReceiver, MsgSender, MsgSocket};
+use sys_util::net::UnixSeqpacket;
+
use BusDevice;
/// Errors for proxy devices.
@@ -64,7 +64,7 @@
ReadConfigResult(u32),
}
-fn child_proc(sock: UnixDatagram, device: &mut BusDevice) {
+fn child_proc(sock: UnixSeqpacket, device: &mut BusDevice) {
let mut running = true;
let sock = MsgSocket::<CommandResult, Command>::new(sock);
@@ -138,7 +138,7 @@
mut keep_fds: Vec<RawFd>,
) -> Result<ProxyDevice> {
let debug_label = device.debug_label();
- let (child_sock, parent_sock) = UnixDatagram::pair().map_err(Error::Io)?;
+ let (child_sock, parent_sock) = UnixSeqpacket::pair().map_err(Error::Io)?;
keep_fds.push(child_sock.as_raw_fd());
// Forking here is safe as long as the program is still single threaded.
diff --git a/devices/src/virtio/balloon.rs b/devices/src/virtio/balloon.rs
index 1dfb72c..e9f36f6 100644
--- a/devices/src/virtio/balloon.rs
+++ b/devices/src/virtio/balloon.rs
@@ -8,13 +8,14 @@
use std::io::Write;
use std::mem;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
-use sys_util::{self, EventFd, GuestAddress, GuestMemory, PollContext, PollToken};
+use sys_util::{
+ self, net::UnixSeqpacket, EventFd, GuestAddress, GuestMemory, PollContext, PollToken,
+};
use super::{
DescriptorChain, Queue, VirtioDevice, INTERRUPT_STATUS_CONFIG_CHANGED,
@@ -67,7 +68,7 @@
interrupt_evt: EventFd,
interrupt_resample_evt: EventFd,
config: Arc<BalloonConfig>,
- command_socket: UnixDatagram,
+ command_socket: UnixSeqpacket,
}
fn valid_inflate_desc(desc: &DescriptorChain) -> bool {
@@ -230,7 +231,7 @@
/// Virtio device for memory balloon inflation/deflation.
pub struct Balloon {
- command_socket: Option<UnixDatagram>,
+ command_socket: Option<UnixSeqpacket>,
config: Arc<BalloonConfig>,
features: u64,
kill_evt: Option<EventFd>,
@@ -238,7 +239,7 @@
impl Balloon {
/// Create a new virtio balloon device.
- pub fn new(command_socket: UnixDatagram) -> Result<Balloon> {
+ pub fn new(command_socket: UnixSeqpacket) -> Result<Balloon> {
Ok(Balloon {
command_socket: Some(command_socket),
config: Arc::new(BalloonConfig {
diff --git a/devices/src/virtio/block.rs b/devices/src/virtio/block.rs
index 9dcf828..3c5d2fd 100644
--- a/devices/src/virtio/block.rs
+++ b/devices/src/virtio/block.rs
@@ -7,7 +7,6 @@
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::mem::{size_of, size_of_val};
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
use std::result;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
@@ -19,8 +18,8 @@
use sys_util::Error as SysError;
use sys_util::Result as SysResult;
use sys_util::{
- EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError, PollContext,
- PollToken, PunchHole, TimerFd, WriteZeroes,
+ net::UnixSeqpacket, EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError,
+ PollContext, PollToken, PunchHole, TimerFd, WriteZeroes,
};
use data_model::{DataInit, Le16, Le32, Le64};
@@ -695,7 +694,7 @@
self.interrupt_evt.write(1).unwrap();
}
- fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixDatagram) {
+ fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixSeqpacket) {
#[derive(PollToken)]
enum Token {
FlushTimer,
@@ -819,7 +818,7 @@
disk_size: Arc<Mutex<u64>>,
avail_features: u64,
read_only: bool,
- control_socket: Option<UnixDatagram>,
+ control_socket: Option<UnixSeqpacket>,
}
fn build_config_space(disk_size: u64) -> virtio_blk_config {
@@ -844,7 +843,7 @@
pub fn new(
mut disk_image: T,
read_only: bool,
- control_socket: Option<UnixDatagram>,
+ control_socket: Option<UnixSeqpacket>,
) -> SysResult<Block<T>> {
let disk_size = disk_image.seek(SeekFrom::End(0))? as u64;
if disk_size % SECTOR_SIZE != 0 {
diff --git a/devices/src/virtio/wl.rs b/devices/src/virtio/wl.rs
index 4d43825..0220d42 100644
--- a/devices/src/virtio/wl.rs
+++ b/devices/src/virtio/wl.rs
@@ -41,7 +41,7 @@
#[cfg(feature = "wl-dmabuf")]
use std::os::raw::{c_uint, c_ulonglong};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::result;
@@ -59,6 +59,7 @@
use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
#[cfg(feature = "wl-dmabuf")]
use resources::GpuMemoryDesc;
+use sys_util::net::UnixSeqpacket;
use sys_util::{
pipe, round_up_to_page_size, Error, EventFd, FileFlags, GuestAddress, GuestMemory,
GuestMemoryError, PollContext, PollToken, Result, ScmSocket, SharedMemory,
@@ -490,7 +491,7 @@
}
impl VmRequester {
- fn new(vm_socket: UnixDatagram) -> VmRequester {
+ fn new(vm_socket: UnixSeqpacket) -> VmRequester {
VmRequester {
inner: Rc::new(RefCell::new(MsgSocket::<VmRequest, VmResponse>::new(
vm_socket,
@@ -1004,7 +1005,7 @@
impl WlState {
fn new(
wayland_path: PathBuf,
- vm_socket: UnixDatagram,
+ vm_socket: UnixSeqpacket,
use_transition_flags: bool,
resource_bridge: Option<ResourceRequestSocket>,
) -> WlState {
@@ -1488,7 +1489,7 @@
in_queue: Queue,
out_queue: Queue,
wayland_path: PathBuf,
- vm_socket: UnixDatagram,
+ vm_socket: UnixSeqpacket,
use_transition_flags: bool,
resource_bridge: Option<ResourceRequestSocket>,
) -> Worker {
@@ -1678,7 +1679,7 @@
pub struct Wl {
kill_evt: Option<EventFd>,
wayland_path: PathBuf,
- vm_socket: Option<UnixDatagram>,
+ vm_socket: Option<UnixSeqpacket>,
resource_bridge: Option<ResourceRequestSocket>,
use_transition_flags: bool,
}
@@ -1686,7 +1687,7 @@
impl Wl {
pub fn new<P: AsRef<Path>>(
wayland_path: P,
- vm_socket: UnixDatagram,
+ vm_socket: UnixSeqpacket,
resource_bridge: Option<ResourceRequestSocket>,
) -> Result<Wl> {
Ok(Wl {
diff --git a/msg_socket/src/lib.rs b/msg_socket/src/lib.rs
index cc2cdc9..fd1f2bc 100644
--- a/msg_socket/src/lib.rs
+++ b/msg_socket/src/lib.rs
@@ -13,9 +13,9 @@
use std::io::Result;
use std::marker::PhantomData;
+use std::ops::Deref;
use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
-use sys_util::{Error as SysError, ScmSocket, UnlinkUnixDatagram};
+use sys_util::{net::UnixSeqpacket, Error as SysError, ScmSocket};
pub use msg_on_socket::*;
pub use msg_on_socket_derive::*;
@@ -24,7 +24,7 @@
/// direction.
pub fn pair<Request: MsgOnSocket, Response: MsgOnSocket>(
) -> Result<(MsgSocket<Request, Response>, MsgSocket<Response, Request>)> {
- let (sock1, sock2) = UnixDatagram::pair()?;
+ let (sock1, sock2) = UnixSeqpacket::pair()?;
let requester = MsgSocket {
sock: sock1,
_i: PhantomData,
@@ -40,14 +40,14 @@
/// Bidirection sock that support both send and recv.
pub struct MsgSocket<I: MsgOnSocket, O: MsgOnSocket> {
- sock: UnixDatagram,
+ sock: UnixSeqpacket,
_i: PhantomData<I>,
_o: PhantomData<O>,
}
impl<I: MsgOnSocket, O: MsgOnSocket> MsgSocket<I, O> {
// Create a new MsgSocket.
- pub fn new(s: UnixDatagram) -> MsgSocket<I, O> {
+ pub fn new(s: UnixSeqpacket) -> MsgSocket<I, O> {
MsgSocket {
sock: s,
_i: PhantomData,
@@ -56,33 +56,22 @@
}
}
-/// Bidirection sock that support both send and recv.
-pub struct UnlinkMsgSocket<I: MsgOnSocket, O: MsgOnSocket> {
- sock: UnlinkUnixDatagram,
- _i: PhantomData<I>,
- _o: PhantomData<O>,
-}
-
-impl<I: MsgOnSocket, O: MsgOnSocket> UnlinkMsgSocket<I, O> {
- // Create a new MsgSocket.
- pub fn new(s: UnlinkUnixDatagram) -> UnlinkMsgSocket<I, O> {
- UnlinkMsgSocket {
- sock: s,
- _i: PhantomData,
- _o: PhantomData,
- }
+impl<I: MsgOnSocket, O: MsgOnSocket> Deref for MsgSocket<I, O> {
+ type Target = UnixSeqpacket;
+ fn deref(&self) -> &Self::Target {
+ &self.sock
}
}
/// One direction socket that only supports sending.
pub struct Sender<M: MsgOnSocket> {
- sock: UnixDatagram,
+ sock: UnixSeqpacket,
_m: PhantomData<M>,
}
impl<M: MsgOnSocket> Sender<M> {
/// Create a new sender sock.
- pub fn new(s: UnixDatagram) -> Sender<M> {
+ pub fn new(s: UnixSeqpacket) -> Sender<M> {
Sender {
sock: s,
_m: PhantomData,
@@ -92,13 +81,13 @@
/// One direction socket that only supports receiving.
pub struct Receiver<M: MsgOnSocket> {
- sock: UnixDatagram,
+ sock: UnixSeqpacket,
_m: PhantomData<M>,
}
impl<M: MsgOnSocket> Receiver<M> {
/// Create a new receiver sock.
- pub fn new(s: UnixDatagram) -> Receiver<M> {
+ pub fn new(s: UnixSeqpacket) -> Receiver<M> {
Receiver {
sock: s,
_m: PhantomData,
@@ -106,8 +95,8 @@
}
}
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for MsgSocket<I, O> {
- fn as_ref(&self) -> &UnixDatagram {
+impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixSeqpacket> for MsgSocket<I, O> {
+ fn as_ref(&self) -> &UnixSeqpacket {
&self.sock
}
}
@@ -118,20 +107,8 @@
}
}
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for UnlinkMsgSocket<I, O> {
- fn as_ref(&self) -> &UnixDatagram {
- self.sock.as_ref()
- }
-}
-
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRawFd for UnlinkMsgSocket<I, O> {
- fn as_raw_fd(&self) -> RawFd {
- self.as_ref().as_raw_fd()
- }
-}
-
-impl<M: MsgOnSocket> AsRef<UnixDatagram> for Sender<M> {
- fn as_ref(&self) -> &UnixDatagram {
+impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Sender<M> {
+ fn as_ref(&self) -> &UnixSeqpacket {
&self.sock
}
}
@@ -142,8 +119,8 @@
}
}
-impl<M: MsgOnSocket> AsRef<UnixDatagram> for Receiver<M> {
- fn as_ref(&self) -> &UnixDatagram {
+impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Receiver<M> {
+ fn as_ref(&self) -> &UnixSeqpacket {
&self.sock
}
}
@@ -155,7 +132,7 @@
}
/// Types that could send a message.
-pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> {
+pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixSeqpacket> {
fn send(&self, msg: &M) -> MsgResult<()> {
let msg_size = M::msg_size();
let fd_size = M::max_fd_count();
@@ -163,7 +140,7 @@
let mut fd_buffer: Vec<RawFd> = vec![0; fd_size];
let fd_size = msg.write_to_buffer(&mut msg_buffer, &mut fd_buffer)?;
- let sock: &UnixDatagram = self.as_ref();
+ let sock: &UnixSeqpacket = self.as_ref();
if fd_size == 0 {
handle_eintr!(sock.send(&msg_buffer))
.map_err(|e| MsgError::Send(SysError::new(e.raw_os_error().unwrap_or(0))))?;
@@ -176,14 +153,14 @@
}
/// Types that could receive a message.
-pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> {
+pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixSeqpacket> {
fn recv(&self) -> MsgResult<M> {
let msg_size = M::msg_size();
let fd_size = M::max_fd_count();
let mut msg_buffer: Vec<u8> = vec![0; msg_size];
let mut fd_buffer: Vec<RawFd> = vec![0; fd_size];
- let sock: &UnixDatagram = self.as_ref();
+ let sock: &UnixSeqpacket = self.as_ref();
let (recv_msg_size, recv_fd_size) = {
if fd_size == 0 {
@@ -197,7 +174,10 @@
}
};
if msg_size != recv_msg_size {
- return Err(MsgError::BadRecvSize(msg_size));
+ return Err(MsgError::BadRecvSize {
+ expected: msg_size,
+ actual: recv_msg_size,
+ });
}
// Safe because fd buffer is read from socket.
let (v, read_fd_size) = unsafe {
@@ -213,8 +193,5 @@
impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for MsgSocket<I, O> {}
impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for MsgSocket<I, O> {}
-impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for UnlinkMsgSocket<I, O> {}
-impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for UnlinkMsgSocket<I, O> {}
-
impl<M: MsgOnSocket> MsgSender<M> for Sender<M> {}
impl<M: MsgOnSocket> MsgReceiver<M> for Receiver<M> {}
diff --git a/msg_socket/src/msg_on_socket.rs b/msg_socket/src/msg_on_socket.rs
index 493c57c..8b01850 100644
--- a/msg_socket/src/msg_on_socket.rs
+++ b/msg_socket/src/msg_on_socket.rs
@@ -23,8 +23,8 @@
/// The type of a received request or response is unknown.
InvalidType,
/// There was not the expected amount of data when receiving a message. The inner
- /// value is how much data is needed.
- BadRecvSize(usize),
+ /// value is how much data is expected and how much data was actually received.
+ BadRecvSize { expected: usize, actual: usize },
/// There was no associated file descriptor received for a request that expected it.
ExpectFd,
/// There was some associated file descriptor received but not used when deserialize.
@@ -47,7 +47,11 @@
Send(e) => write!(f, "failed to send request or response: {}", e),
Recv(e) => write!(f, "failed to receive request or response: {}", e),
InvalidType => write!(f, "invalid type"),
- BadRecvSize(n) => write!(f, "wrong amount of data received; expected {} bytes", n),
+ BadRecvSize { expected, actual } => write!(
+ f,
+ "wrong amount of data received; expected {} bytes; got {} bytes",
+ expected, actual
+ ),
ExpectFd => write!(f, "missing associated file descriptor for request"),
NotExpectFd => write!(f, "unexpected file descriptor is unused"),
WrongFdBufferSize => write!(f, "fd buffer size too small"),
diff --git a/seccomp/arm/common_device.policy b/seccomp/arm/common_device.policy
index 9ccf48b..d2b5a6b 100644
--- a/seccomp/arm/common_device.policy
+++ b/seccomp/arm/common_device.policy
@@ -32,11 +32,13 @@
read: 1
recv: 1
recvfrom: 1
+recvmsg: 1
restart_syscall: 1
rt_sigaction: 1
rt_sigprocmask: 1
rt_sigreturn: 1
sched_getaffinity: 1
+sendmsg: 1
set_robust_list: 1
sigaltstack: 1
write: 1
diff --git a/seccomp/x86_64/common_device.policy b/seccomp/x86_64/common_device.policy
index 7fa6e52..2379b95 100644
--- a/seccomp/x86_64/common_device.policy
+++ b/seccomp/x86_64/common_device.policy
@@ -31,11 +31,13 @@
prctl: arg0 == PR_SET_NAME
read: 1
recvfrom: 1
+recvmsg: 1
restart_syscall: 1
rt_sigaction: 1
rt_sigprocmask: 1
rt_sigreturn: 1
sched_getaffinity: 1
+sendmsg: 1
set_robust_list: 1
sigaltstack: 1
write: 1
diff --git a/src/linux.rs b/src/linux.rs
index 20ef297..2fdda09 100644
--- a/src/linux.rs
+++ b/src/linux.rs
@@ -11,7 +11,7 @@
use std::io::{self, stdin, Read};
use std::mem;
use std::os::unix::io::{FromRawFd, RawFd};
-use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use std::str;
use std::sync::{Arc, Barrier};
@@ -27,13 +27,17 @@
use io_jail::{self, Minijail};
use kvm::*;
use libcras::CrasClient;
-use msg_socket::{MsgReceiver, MsgSender, MsgSocket, UnlinkMsgSocket};
+use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
use net_util::{Error as NetError, Tap};
use qcow::{self, ImageType, QcowFile};
use rand_ish::SimpleRng;
use sync::{Condvar, Mutex};
-use sys_util;
-use sys_util::*;
+use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener};
+use sys_util::{
+ self, block_signal, clear_signal, flock, get_blocked_signals, get_group_id, get_user_id,
+ getegid, geteuid, register_signal_handler, validate_raw_fd, EventFd, FlockOperation,
+ GuestMemory, Killable, PollContext, PollToken, SignalFd, Terminal, TimerFd, SIGRTMIN,
+};
use vhost;
use vm_control::{VmRequest, VmResponse, VmRunMode};
@@ -223,9 +227,9 @@
cfg: Config,
mem: &GuestMemory,
_exit_evt: &EventFd,
- wayland_device_socket: UnixDatagram,
- balloon_device_socket: UnixDatagram,
- disk_device_sockets: &mut Vec<UnixDatagram>,
+ wayland_device_socket: UnixSeqpacket,
+ balloon_device_socket: UnixSeqpacket,
+ disk_device_sockets: &mut Vec<UnixSeqpacket>,
) -> std::result::Result<Vec<(Box<PciDevice + 'static>, Option<Minijail>)>, Box<error::Error>> {
let default_pivot_root: &str = option_env!("DEFAULT_PIVOT_ROOT").unwrap_or("/var/empty");
@@ -998,22 +1002,20 @@
wayland_dmabuf: cfg.wayland_dmabuf,
};
- let mut control_sockets = Vec::new();
- if let Some(ref path_string) = cfg.socket_path {
- let path = Path::new(path_string);
- let dgram = UnixDatagram::bind(path).map_err(Error::CreateSocket)?;
- control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
- UnlinkUnixDatagram(dgram),
- ));
+ let control_server_socket = match &cfg.socket_path {
+ Some(path) => Some(UnlinkUnixSeqpacketListener(
+ UnixSeqpacketListener::bind(path).map_err(Error::CreateSocket)?,
+ )),
+ None => None,
};
+
+ let mut control_sockets = Vec::new();
let (wayland_host_socket, wayland_device_socket) =
- UnixDatagram::pair().map_err(Error::CreateSocket)?;
- control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
- UnlinkUnixDatagram(wayland_host_socket),
- ));
+ UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
+ control_sockets.push(MsgSocket::<VmResponse, VmRequest>::new(wayland_host_socket));
// Balloon gets a special socket so balloon requests can be forwarded from the main process.
let (balloon_host_socket, balloon_device_socket) =
- UnixDatagram::pair().map_err(Error::CreateSocket)?;
+ UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
// Create one control socket per disk.
let mut disk_device_sockets = Vec::new();
@@ -1021,7 +1023,7 @@
let disk_count = cfg.disks.len();
for _ in 0..disk_count {
let (disk_host_socket, disk_device_socket) =
- UnixDatagram::pair().map_err(Error::CreateSocket)?;
+ UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
disk_device_sockets.push(disk_device_socket);
let disk_host_socket = MsgSocket::<VmRequest, VmResponse>::new(disk_host_socket);
disk_host_sockets.push(disk_host_socket);
@@ -1040,6 +1042,7 @@
.map_err(Error::BuildingVm)?;
run_control(
linux,
+ control_server_socket,
control_sockets,
balloon_host_socket,
&disk_host_sockets,
@@ -1049,8 +1052,9 @@
fn run_control(
mut linux: RunnableLinuxVm,
- control_sockets: Vec<UnlinkMsgSocket<VmResponse, VmRequest>>,
- balloon_host_socket: UnixDatagram,
+ control_server_socket: Option<UnlinkUnixSeqpacketListener>,
+ mut control_sockets: Vec<MsgSocket<VmResponse, VmRequest>>,
+ balloon_host_socket: UnixSeqpacket,
disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>],
sigchld_fd: SignalFd,
) -> Result<()> {
@@ -1082,6 +1086,7 @@
CheckAvailableMemory,
LowMemory,
LowmemTimer,
+ VmControlServer,
VmControl { index: usize },
}
@@ -1101,6 +1106,12 @@
poll_ctx
.add(&sigchld_fd, Token::ChildSignal)
.map_err(Error::PollContextAdd)?;
+
+ if let Some(socket_server) = &control_server_socket {
+ poll_ctx
+ .add(socket_server, Token::VmControlServer)
+ .map_err(Error::PollContextAdd)?;
+ }
for (index, socket) in control_sockets.iter().enumerate() {
poll_ctx
.add(socket.as_ref(), Token::VmControl { index })
@@ -1167,6 +1178,8 @@
}
}
};
+
+ let mut vm_control_indices_to_remove = Vec::new();
for event in events.iter_readable() {
match event.token() {
Token::Exit => {
@@ -1286,6 +1299,24 @@
.map_err(Error::PollContextAdd)?;
}
}
+ Token::VmControlServer => {
+ if let Some(socket_server) = &control_server_socket {
+ match socket_server.accept() {
+ Ok(socket) => {
+ poll_ctx
+ .add(
+ &socket,
+ Token::VmControl {
+ index: control_sockets.len(),
+ },
+ )
+ .map_err(Error::PollContextAdd)?;
+ control_sockets.push(MsgSocket::new(socket));
+ }
+ Err(e) => error!("failed to accept socket: {}", e),
+ }
+ }
+ }
Token::VmControl { index } => {
if let Some(socket) = control_sockets.get(index) {
match socket.recv() {
@@ -1316,34 +1347,55 @@
}
}
}
- Err(e) => error!("failed to recv VmRequest: {}", e),
+ Err(e) => {
+ if let MsgError::BadRecvSize { actual: 0, .. } = e {
+ vm_control_indices_to_remove.push(index);
+ } else {
+ error!("failed to recv VmRequest: {}", e);
+ }
+ }
}
}
}
}
}
+
for event in events.iter_hungup() {
- // It's possible more data is readable and buffered while the socket is hungup, so
- // don't delete the socket from the poll context until we're sure all the data is
- // read.
- if !event.readable() {
- match event.token() {
- Token::Exit => {}
- Token::Stdin => {
- let _ = poll_ctx.delete(&stdin_handle);
- }
- Token::ChildSignal => {}
- Token::CheckAvailableMemory => {}
- Token::LowMemory => {}
- Token::LowmemTimer => {}
- Token::VmControl { index } => {
- if let Some(socket) = control_sockets.get(index) {
- let _ = poll_ctx.delete(socket.as_ref());
- }
+ match event.token() {
+ Token::Exit => {}
+ Token::Stdin => {
+ let _ = poll_ctx.delete(&stdin_handle);
+ }
+ Token::ChildSignal => {}
+ Token::CheckAvailableMemory => {}
+ Token::LowMemory => {}
+ Token::LowmemTimer => {}
+ Token::VmControlServer => {}
+ Token::VmControl { index } => {
+ // It's possible more data is readable and buffered while the socket is hungup,
+ // so don't delete the socket from the poll context until we're sure all the
+ // data is read.
+ match control_sockets.get(index).map(|s| s.get_readable_bytes()) {
+ Some(Ok(0)) | Some(Err(_)) => vm_control_indices_to_remove.push(index),
+ Some(Ok(x)) => info!("control index {} has {} bytes readable", index, x),
+ _ => {}
}
}
}
}
+
+ // Sort in reverse so the highest indexes are removed first. This removal algorithm
+ // preserved correct indexes as each element is removed.
+ vm_control_indices_to_remove.sort_unstable_by(|a, b| b.cmp(a));
+ vm_control_indices_to_remove.dedup();
+ for index in vm_control_indices_to_remove {
+ control_sockets.swap_remove(index);
+ if let Some(socket) = control_sockets.get(index) {
+ poll_ctx
+ .add(socket, Token::VmControl { index })
+ .map_err(Error::PollContextAdd)?;
+ }
+ }
}
// VCPU threads MUST see the VmRunMode flag, otherwise they may re-enter the VM.
diff --git a/src/main.rs b/src/main.rs
index 4965deb..32380b7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -46,14 +46,13 @@
use std::fs::OpenOptions;
use std::net;
use std::os::unix::io::RawFd;
-use std::os::unix::net::UnixDatagram;
use std::path::PathBuf;
use std::string::String;
use std::thread::sleep;
use std::time::Duration;
use qcow::QcowFile;
-use sys_util::{getpid, kill_process_group, reap_child, syslog};
+use sys_util::{getpid, kill_process_group, net::UnixSeqpacket, reap_child, syslog};
use argument::{print_help, set_arguments, Argument};
use msg_socket::{MsgSender, Sender};
@@ -723,10 +722,7 @@
let mut return_result = Ok(());
for socket_path in args {
- match UnixDatagram::unbound().and_then(|s| {
- s.connect(&socket_path)?;
- Ok(s)
- }) {
+ match UnixSeqpacket::connect(&socket_path) {
Ok(s) => {
let sender = Sender::<VmRequest>::new(s);
if let Err(e) = sender.send(request) {
@@ -788,10 +784,7 @@
let mut return_result = Ok(());
for socket_path in args {
- match UnixDatagram::unbound().and_then(|s| {
- s.connect(&socket_path)?;
- Ok(s)
- }) {
+ match UnixSeqpacket::connect(&socket_path) {
Ok(s) => {
let sender = Sender::<VmRequest>::new(s);
if let Err(e) = sender.send(&VmRequest::BalloonAdjust(num_bytes)) {
@@ -881,10 +874,7 @@
let mut return_result = Ok(());
for socket_path in args {
- match UnixDatagram::unbound().and_then(|s| {
- s.connect(&socket_path)?;
- Ok(s)
- }) {
+ match UnixSeqpacket::connect(&socket_path) {
Ok(s) => {
let sender = Sender::<VmRequest>::new(s);
if let Err(e) = sender.send(&request) {
diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs
index 514a964..f0fc209 100644
--- a/vm_control/src/lib.rs
+++ b/vm_control/src/lib.rs
@@ -22,7 +22,6 @@
use std::fs::File;
use std::io::{Seek, SeekFrom};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
use libc::{EINVAL, ENODEV};
@@ -30,7 +29,9 @@
use kvm::{Datamatch, IoeventAddress, Vm};
use msg_socket::{MsgOnSocket, MsgReceiver, MsgResult, MsgSender, MsgSocket};
use resources::{GpuMemoryDesc, SystemAllocator};
-use sys_util::{Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result};
+use sys_util::{
+ net::UnixSeqpacket, Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result,
+};
/// A file descriptor either borrowed or owned by this.
pub enum MaybeOwnedFd {
@@ -170,7 +171,7 @@
vm: &mut Vm,
sys_allocator: &mut SystemAllocator,
run_mode: &mut Option<VmRunMode>,
- balloon_host_socket: &UnixDatagram,
+ balloon_host_socket: &UnixSeqpacket,
disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>],
) -> VmResponse {
match *self {