ac97: Add AudioWorker and refactor start_audio
This CL
- simplifies ac97_bus_master audio thread spawn logic
- makes data ownership more clear for future maintenance
- saves several redundant lock calls
High level design:
ac97_bus_master <=> one AudioThreadInfo for each Ac97Function
<=> start / stop an AudioWorker
Changes in this CL:
- Add AudioWorker which contains the data, logic inside spawn::thread()
and some controls which own by AudioThreadInfo
- Remove `fn audio_thread`.
- Make AudioThreadInfo an audio thread control interface for each
Ac97Function
- `start` consumes an worker and spawn a running thread
- `stop` stops the thread and destroy the worker
- Add is_running support
- Combine several regs.lock() calls to save mutex lock time.
- Add create_audio_worker to create AudioWorker
BUG=b:173364323
TEST=Build and test audio in VMs
Change-Id: Iac8090fac12ac91f50b3e601efb918d79ba089af
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2550480
Commit-Queue: Chih-Yang Hsia <paulhsia@chromium.org>
Tested-by: Chih-Yang Hsia <paulhsia@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Reviewed-by: Fletcher Woodruff <fletcherw@chromium.org>
diff --git a/devices/src/pci/ac97_bus_master.rs b/devices/src/pci/ac97_bus_master.rs
index dc0729a..23d9ead 100644
--- a/devices/src/pci/ac97_bus_master.rs
+++ b/devices/src/pci/ac97_bus_master.rs
@@ -178,6 +178,40 @@
stream_control: None,
}
}
+
+ fn is_running(&self) -> bool {
+ self.thread_run.load(Ordering::Relaxed)
+ }
+
+ fn start(&mut self, mut worker: AudioWorker) {
+ const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
+ self.thread_run.store(true, Ordering::Relaxed);
+ self.thread = Some(thread::spawn(move || {
+ if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
+ .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
+ {
+ warn!("Failed to set audio thread to real time: {}", e);
+ }
+
+ if let Err(e) = worker.run() {
+ error!("{:?} error: {}", worker.func, e);
+ }
+
+ worker.thread_run.store(false, Ordering::Relaxed);
+ }));
+
+ self.stream_control = Some(Box::new(DummyStreamControl::new()));
+ }
+
+ fn stop(&mut self) {
+ self.thread_run.store(false, Ordering::Relaxed);
+ self.thread_semaphore.notify_one();
+ if let Some(thread) = self.thread.take() {
+ if let Err(e) = thread.join() {
+ error!("Failed to join thread: {:?}.", e);
+ }
+ }
+ }
}
/// `Ac97BusMaster` emulates the bus master portion of AC97. It exposes a register read/write
@@ -302,7 +336,7 @@
PO_SR_16 => regs.po_regs.sr,
PO_PICB_18 => {
// PO PICB
- if !self.po_info.thread_run.load(Ordering::Relaxed) {
+ if !self.thread_info(Ac97Function::Output).is_running() {
// Not running, no need to estimate what has been consumed.
regs.po_regs.picb
} else {
@@ -455,7 +489,7 @@
let cr = self.regs.lock().func_regs(func).cr;
if val & CR_RPBM == 0 {
// Run/Pause set to pause.
- self.stop_audio(func);
+ self.thread_info_mut(func).stop();
let mut regs = self.regs.lock();
regs.func_regs_mut(func).sr |= SR_DCH;
} else if cr & CR_RPBM == 0 {
@@ -490,10 +524,7 @@
if new_glob_cnt & GLOB_CNT_WARM_RESET != 0 {
// Check if running and if so, ignore. Warm reset is specified to no-op when the device
// is playing or recording audio.
- if !self.po_info.thread_run.load(Ordering::Relaxed)
- && !self.pi_info.thread_run.load(Ordering::Relaxed)
- && !self.pmic_info.thread_run.load(Ordering::Relaxed)
- {
+ if !self.is_audio_running() {
self.stop_all_audio();
let mut regs = self.regs.lock();
regs.glob_cnt = new_glob_cnt & !GLOB_CNT_WARM_RESET; // Auto-cleared reset bit.
@@ -517,29 +548,27 @@
}
}
- fn start_audio(&mut self, func: Ac97Function, mixer: &Ac97Mixer) -> AudioResult<()> {
- const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
- let sample_rate = self.current_sample_rate(func, mixer);
- let (direction, thread_info) = match func {
- Ac97Function::Microphone => (StreamDirection::Capture, &mut self.pmic_info),
- Ac97Function::Input => (StreamDirection::Capture, &mut self.pi_info),
- Ac97Function::Output => (StreamDirection::Playback, &mut self.po_info),
+ fn create_audio_worker(
+ &mut self,
+ mixer: &Ac97Mixer,
+ func: Ac97Function,
+ ) -> AudioResult<AudioWorker> {
+ let direction = match func {
+ Ac97Function::Microphone => StreamDirection::Capture,
+ Ac97Function::Input => StreamDirection::Capture,
+ Ac97Function::Output => StreamDirection::Playback,
};
- let buffer_samples = current_buffer_size(self.regs.lock().func_regs(func), &self.mem)?;
- let num_channels = self.regs.lock().channel_count(func);
+ let mut locked_regs = self.regs.lock();
+ let sample_rate = self.current_sample_rate(func, mixer);
+ let buffer_samples = current_buffer_size(locked_regs.func_regs(func), &self.mem)?;
+ let num_channels = locked_regs.channel_count(func);
let buffer_frames = buffer_samples / num_channels;
- thread_info.thread_run.store(true, Ordering::Relaxed);
- let thread_run = thread_info.thread_run.clone();
- let thread_semaphore = thread_info.thread_semaphore.clone();
- let thread_mem = self.mem.clone();
- let thread_regs = self.regs.clone();
let mut pending_buffers = VecDeque::with_capacity(2);
let starting_offsets = match direction {
StreamDirection::Capture => {
let mut offsets = [0, 0];
- let mut locked_regs = self.regs.lock();
for i in 0..2 {
let buffer = next_guest_buffer(&mut locked_regs, &self.mem, func, 0)?
.ok_or(AudioError::NoBufferAvailable)?;
@@ -564,55 +593,48 @@
)
.map_err(AudioError::CreateStream)?;
- thread_info.stream_control = Some(Box::new(DummyStreamControl::new()));
- thread_info.thread = Some(thread::spawn(move || {
- if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
- .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
- {
- warn!("Failed to set audio thread to real time: {}", e);
- }
-
- let message_interval =
- Duration::from_secs_f64(buffer_frames as f64 / sample_rate as f64);
-
- if let Err(e) = audio_thread(
- func,
- thread_regs,
- thread_mem,
- &thread_run,
- thread_semaphore,
- message_interval,
- stream,
- pending_buffers,
- ) {
- error!("{:?} error: {}", func, e);
- }
- thread_run.store(false, Ordering::Relaxed);
- }));
- self.update_mixer_settings(mixer);
-
- Ok(())
+ let params = AudioWorkerParams {
+ func,
+ stream,
+ pending_buffers,
+ message_interval: Duration::from_secs_f64(buffer_frames as f64 / sample_rate as f64),
+ };
+ Ok(AudioWorker::new(&self, params))
}
- fn stop_audio(&mut self, func: Ac97Function) {
- let thread_info = match func {
- Ac97Function::Microphone => &mut self.pmic_info,
- Ac97Function::Input => &mut self.pi_info,
- Ac97Function::Output => &mut self.po_info,
- };
- thread_info.thread_run.store(false, Ordering::Relaxed);
- thread_info.thread_semaphore.notify_one();
- if let Some(thread) = thread_info.thread.take() {
- if let Err(e) = thread.join() {
- error!("Failed to join {:?} thread: {:?}.", func, e);
- }
+ fn thread_info(&self, func: Ac97Function) -> &AudioThreadInfo {
+ match func {
+ Ac97Function::Microphone => &self.pmic_info,
+ Ac97Function::Input => &self.pi_info,
+ Ac97Function::Output => &self.po_info,
}
}
+ fn thread_info_mut(&mut self, func: Ac97Function) -> &mut AudioThreadInfo {
+ match func {
+ Ac97Function::Microphone => &mut self.pmic_info,
+ Ac97Function::Input => &mut self.pi_info,
+ Ac97Function::Output => &mut self.po_info,
+ }
+ }
+
+ fn is_audio_running(&self) -> bool {
+ self.thread_info(Ac97Function::Output).is_running()
+ || self.thread_info(Ac97Function::Input).is_running()
+ || self.thread_info(Ac97Function::Microphone).is_running()
+ }
+
+ fn start_audio(&mut self, func: Ac97Function, mixer: &Ac97Mixer) -> AudioResult<()> {
+ let audio_worker = self.create_audio_worker(mixer, func)?;
+ self.thread_info_mut(func).start(audio_worker);
+ self.update_mixer_settings(mixer);
+ Ok(())
+ }
+
fn stop_all_audio(&mut self) {
- self.stop_audio(Ac97Function::Input);
- self.stop_audio(Ac97Function::Output);
- self.stop_audio(Ac97Function::Microphone);
+ self.thread_info_mut(Ac97Function::Input).stop();
+ self.thread_info_mut(Ac97Function::Output).stop();
+ self.thread_info_mut(Ac97Function::Microphone).stop();
}
// Helper function for resetting function registers.
@@ -754,113 +776,142 @@
Ok(())
}
-// Runs and updates the offset within the stream shm where samples can be
-// found/placed for shm playback/capture streams, respectively
-fn audio_thread(
+struct AudioWorker {
func: Ac97Function,
regs: Arc<Mutex<Ac97BusMasterRegs>>,
mem: GuestMemory,
- thread_run: &AtomicBool,
+ thread_run: Arc<AtomicBool>,
lvi_semaphore: Arc<Condvar>,
message_interval: Duration,
- mut stream: Box<dyn ShmStream>,
- // A queue of the pending buffers at the server.
- mut pending_buffers: VecDeque<Option<GuestBuffer>>,
-) -> AudioResult<()> {
- // Set up picb.
- {
- let mut locked_regs = regs.lock();
- locked_regs.func_regs_mut(func).picb =
- current_buffer_size(locked_regs.func_regs(func), &mem)? as u16;
+ stream: Box<dyn ShmStream>,
+ pending_buffers: VecDeque<Option<GuestBuffer>>,
+}
+
+struct AudioWorkerParams {
+ func: Ac97Function,
+ stream: Box<dyn ShmStream>,
+ pending_buffers: VecDeque<Option<GuestBuffer>>,
+ message_interval: Duration,
+}
+
+impl AudioWorker {
+ fn new(bus_master: &Ac97BusMaster, args: AudioWorkerParams) -> Self {
+ Self {
+ func: args.func,
+ regs: bus_master.regs.clone(),
+ mem: bus_master.mem.clone(),
+ thread_run: bus_master.thread_info(args.func).thread_run.clone(),
+ lvi_semaphore: bus_master.thread_info(args.func).thread_semaphore.clone(),
+ message_interval: args.message_interval,
+ stream: args.stream,
+ pending_buffers: args.pending_buffers,
+ }
}
- 'audio_loop: while thread_run.load(Ordering::Relaxed) {
+ // Runs and updates the offset within the stream shm where samples can be
+ // found/placed for shm playback/capture streams, respectively
+ fn run(&mut self) -> AudioResult<()> {
+ let func = self.func;
+ let message_interval = self.message_interval;
+ // Set up picb.
{
- let mut locked_regs = regs.lock();
- while locked_regs.func_regs(func).sr & SR_DCH != 0 {
- locked_regs = lvi_semaphore.wait(locked_regs);
- if !thread_run.load(Ordering::Relaxed) {
- break 'audio_loop;
- }
- }
+ let mut locked_regs = self.regs.lock();
+ locked_regs.func_regs_mut(func).picb =
+ current_buffer_size(locked_regs.func_regs(func), &self.mem)? as u16;
}
- let timeout = Duration::from_secs(1);
- let action = stream
- .wait_for_next_action_with_timeout(timeout)
- .map_err(AudioError::WaitForAction)?;
-
- let request = match action {
- None => {
- warn!("No audio message received within timeout of {:?}", timeout);
- continue;
- }
- Some(request) => request,
- };
- let start = Instant::now();
-
- let next_buffer = {
- let mut locked_regs = regs.lock();
- if pending_buffers.len() == 2 {
- // When we have two pending buffers and receive a request for
- // another, we know that oldest buffer has been completed.
- // However, if that old buffer was an empty buffer we sent
- // because the guest driver had no available buffers, we don't
- // want to mark a buffer complete.
- if let Some(Some(_)) = pending_buffers.pop_front() {
- buffer_completed(&mut locked_regs, &mem, func)?;
+ 'audio_loop: while self.thread_run.load(Ordering::Relaxed) {
+ {
+ let mut locked_regs = self.regs.lock();
+ while locked_regs.func_regs(func).sr & SR_DCH != 0 {
+ locked_regs = self.lvi_semaphore.wait(locked_regs);
+ if !self.thread_run.load(Ordering::Relaxed) {
+ break 'audio_loop;
+ }
}
}
- // We count the number of pending, real buffers at the server, and
- // then use that as our offset from CIV.
- let offset = pending_buffers.iter().filter(|e| e.is_some()).count();
+ let timeout = Duration::from_secs(1);
+ let action = self
+ .stream
+ .wait_for_next_action_with_timeout(timeout)
+ .map_err(AudioError::WaitForAction)?;
- // Get a buffer to respond to our request. If there's no buffer
- // available, we'll wait one buffer interval and check again.
- loop {
- if let Some(buffer) = next_guest_buffer(&mut locked_regs, &mem, func, offset)? {
- break Some(buffer);
+ let request = match action {
+ None => {
+ warn!("No audio message received within timeout of {:?}", timeout);
+ continue;
}
- let elapsed = start.elapsed();
- if elapsed > message_interval {
- break None;
- }
- locked_regs = lvi_semaphore
- .wait_timeout(locked_regs, message_interval - elapsed)
- .0;
- }
- };
+ Some(request) => request,
+ };
+ let start = Instant::now();
- match next_buffer {
- Some(ref buffer) => {
- let requested_frames = request.requested_frames();
- if requested_frames != buffer.frames {
- // We should be able to handle when the number of frames in
- // the buffer doesn't match the number of frames requested,
- // but we don't yet.
- warn!(
- "Stream requested {} frames but buffer had {} frames: {:?}",
- requested_frames, buffer.frames, buffer
- );
+ let next_buffer = {
+ let mut locked_regs = self.regs.lock();
+ if self.pending_buffers.len() == 2 {
+ // When we have two pending buffers and receive a request for
+ // another, we know that oldest buffer has been completed.
+ // However, if that old buffer was an empty buffer we sent
+ // because the guest driver had no available buffers, we don't
+ // want to mark a buffer complete.
+ if let Some(Some(_)) = self.pending_buffers.pop_front() {
+ buffer_completed(&mut locked_regs, &self.mem, self.func)?;
+ }
}
- request
- .set_buffer_offset_and_frames(
- buffer.offset,
- std::cmp::min(requested_frames, buffer.frames),
- )
- .map_err(AudioError::RespondRequest)?;
+ // We count the number of pending, real buffers at the server, and
+ // then use that as our offset from CIV.
+ let offset = self.pending_buffers.iter().filter(|e| e.is_some()).count();
+
+ // Get a buffer to respond to our request. If there's no buffer
+ // available, we'll wait one buffer interval and check again.
+ loop {
+ if let Some(buffer) =
+ next_guest_buffer(&mut locked_regs, &self.mem, func, offset)?
+ {
+ break Some(buffer);
+ }
+ let elapsed = start.elapsed();
+ if elapsed > message_interval {
+ break None;
+ }
+ locked_regs = self
+ .lvi_semaphore
+ .wait_timeout(locked_regs, message_interval - elapsed)
+ .0;
+ }
+ };
+
+ match next_buffer {
+ Some(ref buffer) => {
+ let requested_frames = request.requested_frames();
+ if requested_frames != buffer.frames {
+ // We should be able to handle when the number of frames in
+ // the buffer doesn't match the number of frames requested,
+ // but we don't yet.
+ warn!(
+ "Stream requested {} frames but buffer had {} frames: {:?}",
+ requested_frames, buffer.frames, buffer
+ );
+ }
+
+ request
+ .set_buffer_offset_and_frames(
+ buffer.offset,
+ std::cmp::min(requested_frames, buffer.frames),
+ )
+ .map_err(AudioError::RespondRequest)?;
+ }
+ None => {
+ request
+ .ignore_request()
+ .map_err(AudioError::RespondRequest)?;
+ }
}
- None => {
- request
- .ignore_request()
- .map_err(AudioError::RespondRequest)?;
- }
+ self.pending_buffers.push_back(next_buffer);
}
- pending_buffers.push_back(next_buffer);
+ Ok(())
}
- Ok(())
}
// Update the status register and if any interrupts need to fire, raise them.