ac97: Add AudioWorker and refactor start_audio

This CL
- simplifies ac97_bus_master audio thread spawn logic
- makes data ownership more clear for future maintenance
- saves several redundant lock calls

High level design:
ac97_bus_master <=> one AudioThreadInfo for each Ac97Function
                <=> start / stop an AudioWorker

Changes in this CL:
- Add AudioWorker which contains the data, logic inside spawn::thread()
  and some controls which own by AudioThreadInfo
- Remove `fn audio_thread`.
- Make AudioThreadInfo an audio thread control interface for each
  Ac97Function
  - `start` consumes an worker and spawn a running thread
  - `stop` stops the thread and destroy the worker
  - Add is_running support
- Combine several regs.lock() calls to save mutex lock time.
- Add create_audio_worker to create AudioWorker

BUG=b:173364323
TEST=Build and test audio in VMs

Change-Id: Iac8090fac12ac91f50b3e601efb918d79ba089af
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2550480
Commit-Queue: Chih-Yang Hsia <paulhsia@chromium.org>
Tested-by: Chih-Yang Hsia <paulhsia@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Reviewed-by: Fletcher Woodruff <fletcherw@chromium.org>
diff --git a/devices/src/pci/ac97_bus_master.rs b/devices/src/pci/ac97_bus_master.rs
index dc0729a..23d9ead 100644
--- a/devices/src/pci/ac97_bus_master.rs
+++ b/devices/src/pci/ac97_bus_master.rs
@@ -178,6 +178,40 @@
             stream_control: None,
         }
     }
+
+    fn is_running(&self) -> bool {
+        self.thread_run.load(Ordering::Relaxed)
+    }
+
+    fn start(&mut self, mut worker: AudioWorker) {
+        const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
+        self.thread_run.store(true, Ordering::Relaxed);
+        self.thread = Some(thread::spawn(move || {
+            if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
+                .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
+            {
+                warn!("Failed to set audio thread to real time: {}", e);
+            }
+
+            if let Err(e) = worker.run() {
+                error!("{:?} error: {}", worker.func, e);
+            }
+
+            worker.thread_run.store(false, Ordering::Relaxed);
+        }));
+
+        self.stream_control = Some(Box::new(DummyStreamControl::new()));
+    }
+
+    fn stop(&mut self) {
+        self.thread_run.store(false, Ordering::Relaxed);
+        self.thread_semaphore.notify_one();
+        if let Some(thread) = self.thread.take() {
+            if let Err(e) = thread.join() {
+                error!("Failed to join thread: {:?}.", e);
+            }
+        }
+    }
 }
 
 /// `Ac97BusMaster` emulates the bus master portion of AC97. It exposes a register read/write
@@ -302,7 +336,7 @@
             PO_SR_16 => regs.po_regs.sr,
             PO_PICB_18 => {
                 // PO PICB
-                if !self.po_info.thread_run.load(Ordering::Relaxed) {
+                if !self.thread_info(Ac97Function::Output).is_running() {
                     // Not running, no need to estimate what has been consumed.
                     regs.po_regs.picb
                 } else {
@@ -455,7 +489,7 @@
             let cr = self.regs.lock().func_regs(func).cr;
             if val & CR_RPBM == 0 {
                 // Run/Pause set to pause.
-                self.stop_audio(func);
+                self.thread_info_mut(func).stop();
                 let mut regs = self.regs.lock();
                 regs.func_regs_mut(func).sr |= SR_DCH;
             } else if cr & CR_RPBM == 0 {
@@ -490,10 +524,7 @@
         if new_glob_cnt & GLOB_CNT_WARM_RESET != 0 {
             // Check if running and if so, ignore. Warm reset is specified to no-op when the device
             // is playing or recording audio.
-            if !self.po_info.thread_run.load(Ordering::Relaxed)
-                && !self.pi_info.thread_run.load(Ordering::Relaxed)
-                && !self.pmic_info.thread_run.load(Ordering::Relaxed)
-            {
+            if !self.is_audio_running() {
                 self.stop_all_audio();
                 let mut regs = self.regs.lock();
                 regs.glob_cnt = new_glob_cnt & !GLOB_CNT_WARM_RESET; // Auto-cleared reset bit.
@@ -517,29 +548,27 @@
         }
     }
 
-    fn start_audio(&mut self, func: Ac97Function, mixer: &Ac97Mixer) -> AudioResult<()> {
-        const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
-        let sample_rate = self.current_sample_rate(func, mixer);
-        let (direction, thread_info) = match func {
-            Ac97Function::Microphone => (StreamDirection::Capture, &mut self.pmic_info),
-            Ac97Function::Input => (StreamDirection::Capture, &mut self.pi_info),
-            Ac97Function::Output => (StreamDirection::Playback, &mut self.po_info),
+    fn create_audio_worker(
+        &mut self,
+        mixer: &Ac97Mixer,
+        func: Ac97Function,
+    ) -> AudioResult<AudioWorker> {
+        let direction = match func {
+            Ac97Function::Microphone => StreamDirection::Capture,
+            Ac97Function::Input => StreamDirection::Capture,
+            Ac97Function::Output => StreamDirection::Playback,
         };
 
-        let buffer_samples = current_buffer_size(self.regs.lock().func_regs(func), &self.mem)?;
-        let num_channels = self.regs.lock().channel_count(func);
+        let mut locked_regs = self.regs.lock();
+        let sample_rate = self.current_sample_rate(func, mixer);
+        let buffer_samples = current_buffer_size(locked_regs.func_regs(func), &self.mem)?;
+        let num_channels = locked_regs.channel_count(func);
         let buffer_frames = buffer_samples / num_channels;
-        thread_info.thread_run.store(true, Ordering::Relaxed);
-        let thread_run = thread_info.thread_run.clone();
-        let thread_semaphore = thread_info.thread_semaphore.clone();
-        let thread_mem = self.mem.clone();
-        let thread_regs = self.regs.clone();
 
         let mut pending_buffers = VecDeque::with_capacity(2);
         let starting_offsets = match direction {
             StreamDirection::Capture => {
                 let mut offsets = [0, 0];
-                let mut locked_regs = self.regs.lock();
                 for i in 0..2 {
                     let buffer = next_guest_buffer(&mut locked_regs, &self.mem, func, 0)?
                         .ok_or(AudioError::NoBufferAvailable)?;
@@ -564,55 +593,48 @@
             )
             .map_err(AudioError::CreateStream)?;
 
-        thread_info.stream_control = Some(Box::new(DummyStreamControl::new()));
-        thread_info.thread = Some(thread::spawn(move || {
-            if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
-                .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
-            {
-                warn!("Failed to set audio thread to real time: {}", e);
-            }
-
-            let message_interval =
-                Duration::from_secs_f64(buffer_frames as f64 / sample_rate as f64);
-
-            if let Err(e) = audio_thread(
-                func,
-                thread_regs,
-                thread_mem,
-                &thread_run,
-                thread_semaphore,
-                message_interval,
-                stream,
-                pending_buffers,
-            ) {
-                error!("{:?} error: {}", func, e);
-            }
-            thread_run.store(false, Ordering::Relaxed);
-        }));
-        self.update_mixer_settings(mixer);
-
-        Ok(())
+        let params = AudioWorkerParams {
+            func,
+            stream,
+            pending_buffers,
+            message_interval: Duration::from_secs_f64(buffer_frames as f64 / sample_rate as f64),
+        };
+        Ok(AudioWorker::new(&self, params))
     }
 
-    fn stop_audio(&mut self, func: Ac97Function) {
-        let thread_info = match func {
-            Ac97Function::Microphone => &mut self.pmic_info,
-            Ac97Function::Input => &mut self.pi_info,
-            Ac97Function::Output => &mut self.po_info,
-        };
-        thread_info.thread_run.store(false, Ordering::Relaxed);
-        thread_info.thread_semaphore.notify_one();
-        if let Some(thread) = thread_info.thread.take() {
-            if let Err(e) = thread.join() {
-                error!("Failed to join {:?} thread: {:?}.", func, e);
-            }
+    fn thread_info(&self, func: Ac97Function) -> &AudioThreadInfo {
+        match func {
+            Ac97Function::Microphone => &self.pmic_info,
+            Ac97Function::Input => &self.pi_info,
+            Ac97Function::Output => &self.po_info,
         }
     }
 
+    fn thread_info_mut(&mut self, func: Ac97Function) -> &mut AudioThreadInfo {
+        match func {
+            Ac97Function::Microphone => &mut self.pmic_info,
+            Ac97Function::Input => &mut self.pi_info,
+            Ac97Function::Output => &mut self.po_info,
+        }
+    }
+
+    fn is_audio_running(&self) -> bool {
+        self.thread_info(Ac97Function::Output).is_running()
+            || self.thread_info(Ac97Function::Input).is_running()
+            || self.thread_info(Ac97Function::Microphone).is_running()
+    }
+
+    fn start_audio(&mut self, func: Ac97Function, mixer: &Ac97Mixer) -> AudioResult<()> {
+        let audio_worker = self.create_audio_worker(mixer, func)?;
+        self.thread_info_mut(func).start(audio_worker);
+        self.update_mixer_settings(mixer);
+        Ok(())
+    }
+
     fn stop_all_audio(&mut self) {
-        self.stop_audio(Ac97Function::Input);
-        self.stop_audio(Ac97Function::Output);
-        self.stop_audio(Ac97Function::Microphone);
+        self.thread_info_mut(Ac97Function::Input).stop();
+        self.thread_info_mut(Ac97Function::Output).stop();
+        self.thread_info_mut(Ac97Function::Microphone).stop();
     }
 
     // Helper function for resetting function registers.
@@ -754,113 +776,142 @@
     Ok(())
 }
 
-// Runs and updates the offset within the stream shm where samples can be
-// found/placed for shm playback/capture streams, respectively
-fn audio_thread(
+struct AudioWorker {
     func: Ac97Function,
     regs: Arc<Mutex<Ac97BusMasterRegs>>,
     mem: GuestMemory,
-    thread_run: &AtomicBool,
+    thread_run: Arc<AtomicBool>,
     lvi_semaphore: Arc<Condvar>,
     message_interval: Duration,
-    mut stream: Box<dyn ShmStream>,
-    // A queue of the pending buffers at the server.
-    mut pending_buffers: VecDeque<Option<GuestBuffer>>,
-) -> AudioResult<()> {
-    // Set up picb.
-    {
-        let mut locked_regs = regs.lock();
-        locked_regs.func_regs_mut(func).picb =
-            current_buffer_size(locked_regs.func_regs(func), &mem)? as u16;
+    stream: Box<dyn ShmStream>,
+    pending_buffers: VecDeque<Option<GuestBuffer>>,
+}
+
+struct AudioWorkerParams {
+    func: Ac97Function,
+    stream: Box<dyn ShmStream>,
+    pending_buffers: VecDeque<Option<GuestBuffer>>,
+    message_interval: Duration,
+}
+
+impl AudioWorker {
+    fn new(bus_master: &Ac97BusMaster, args: AudioWorkerParams) -> Self {
+        Self {
+            func: args.func,
+            regs: bus_master.regs.clone(),
+            mem: bus_master.mem.clone(),
+            thread_run: bus_master.thread_info(args.func).thread_run.clone(),
+            lvi_semaphore: bus_master.thread_info(args.func).thread_semaphore.clone(),
+            message_interval: args.message_interval,
+            stream: args.stream,
+            pending_buffers: args.pending_buffers,
+        }
     }
 
-    'audio_loop: while thread_run.load(Ordering::Relaxed) {
+    // Runs and updates the offset within the stream shm where samples can be
+    // found/placed for shm playback/capture streams, respectively
+    fn run(&mut self) -> AudioResult<()> {
+        let func = self.func;
+        let message_interval = self.message_interval;
+        // Set up picb.
         {
-            let mut locked_regs = regs.lock();
-            while locked_regs.func_regs(func).sr & SR_DCH != 0 {
-                locked_regs = lvi_semaphore.wait(locked_regs);
-                if !thread_run.load(Ordering::Relaxed) {
-                    break 'audio_loop;
-                }
-            }
+            let mut locked_regs = self.regs.lock();
+            locked_regs.func_regs_mut(func).picb =
+                current_buffer_size(locked_regs.func_regs(func), &self.mem)? as u16;
         }
 
-        let timeout = Duration::from_secs(1);
-        let action = stream
-            .wait_for_next_action_with_timeout(timeout)
-            .map_err(AudioError::WaitForAction)?;
-
-        let request = match action {
-            None => {
-                warn!("No audio message received within timeout of {:?}", timeout);
-                continue;
-            }
-            Some(request) => request,
-        };
-        let start = Instant::now();
-
-        let next_buffer = {
-            let mut locked_regs = regs.lock();
-            if pending_buffers.len() == 2 {
-                // When we have two pending buffers and receive a request for
-                // another, we know that oldest buffer has been completed.
-                // However, if that old buffer was an empty buffer we sent
-                // because the guest driver had no available buffers, we don't
-                // want to mark a buffer complete.
-                if let Some(Some(_)) = pending_buffers.pop_front() {
-                    buffer_completed(&mut locked_regs, &mem, func)?;
+        'audio_loop: while self.thread_run.load(Ordering::Relaxed) {
+            {
+                let mut locked_regs = self.regs.lock();
+                while locked_regs.func_regs(func).sr & SR_DCH != 0 {
+                    locked_regs = self.lvi_semaphore.wait(locked_regs);
+                    if !self.thread_run.load(Ordering::Relaxed) {
+                        break 'audio_loop;
+                    }
                 }
             }
 
-            // We count the number of pending, real buffers at the server, and
-            // then use that as our offset from CIV.
-            let offset = pending_buffers.iter().filter(|e| e.is_some()).count();
+            let timeout = Duration::from_secs(1);
+            let action = self
+                .stream
+                .wait_for_next_action_with_timeout(timeout)
+                .map_err(AudioError::WaitForAction)?;
 
-            // Get a buffer to respond to our request. If there's no buffer
-            // available, we'll wait one buffer interval and check again.
-            loop {
-                if let Some(buffer) = next_guest_buffer(&mut locked_regs, &mem, func, offset)? {
-                    break Some(buffer);
+            let request = match action {
+                None => {
+                    warn!("No audio message received within timeout of {:?}", timeout);
+                    continue;
                 }
-                let elapsed = start.elapsed();
-                if elapsed > message_interval {
-                    break None;
-                }
-                locked_regs = lvi_semaphore
-                    .wait_timeout(locked_regs, message_interval - elapsed)
-                    .0;
-            }
-        };
+                Some(request) => request,
+            };
+            let start = Instant::now();
 
-        match next_buffer {
-            Some(ref buffer) => {
-                let requested_frames = request.requested_frames();
-                if requested_frames != buffer.frames {
-                    // We should be able to handle when the number of frames in
-                    // the buffer doesn't match the number of frames requested,
-                    // but we don't yet.
-                    warn!(
-                        "Stream requested {} frames but buffer had {} frames: {:?}",
-                        requested_frames, buffer.frames, buffer
-                    );
+            let next_buffer = {
+                let mut locked_regs = self.regs.lock();
+                if self.pending_buffers.len() == 2 {
+                    // When we have two pending buffers and receive a request for
+                    // another, we know that oldest buffer has been completed.
+                    // However, if that old buffer was an empty buffer we sent
+                    // because the guest driver had no available buffers, we don't
+                    // want to mark a buffer complete.
+                    if let Some(Some(_)) = self.pending_buffers.pop_front() {
+                        buffer_completed(&mut locked_regs, &self.mem, self.func)?;
+                    }
                 }
 
-                request
-                    .set_buffer_offset_and_frames(
-                        buffer.offset,
-                        std::cmp::min(requested_frames, buffer.frames),
-                    )
-                    .map_err(AudioError::RespondRequest)?;
+                // We count the number of pending, real buffers at the server, and
+                // then use that as our offset from CIV.
+                let offset = self.pending_buffers.iter().filter(|e| e.is_some()).count();
+
+                // Get a buffer to respond to our request. If there's no buffer
+                // available, we'll wait one buffer interval and check again.
+                loop {
+                    if let Some(buffer) =
+                        next_guest_buffer(&mut locked_regs, &self.mem, func, offset)?
+                    {
+                        break Some(buffer);
+                    }
+                    let elapsed = start.elapsed();
+                    if elapsed > message_interval {
+                        break None;
+                    }
+                    locked_regs = self
+                        .lvi_semaphore
+                        .wait_timeout(locked_regs, message_interval - elapsed)
+                        .0;
+                }
+            };
+
+            match next_buffer {
+                Some(ref buffer) => {
+                    let requested_frames = request.requested_frames();
+                    if requested_frames != buffer.frames {
+                        // We should be able to handle when the number of frames in
+                        // the buffer doesn't match the number of frames requested,
+                        // but we don't yet.
+                        warn!(
+                            "Stream requested {} frames but buffer had {} frames: {:?}",
+                            requested_frames, buffer.frames, buffer
+                        );
+                    }
+
+                    request
+                        .set_buffer_offset_and_frames(
+                            buffer.offset,
+                            std::cmp::min(requested_frames, buffer.frames),
+                        )
+                        .map_err(AudioError::RespondRequest)?;
+                }
+                None => {
+                    request
+                        .ignore_request()
+                        .map_err(AudioError::RespondRequest)?;
+                }
             }
-            None => {
-                request
-                    .ignore_request()
-                    .map_err(AudioError::RespondRequest)?;
-            }
+            self.pending_buffers.push_back(next_buffer);
         }
-        pending_buffers.push_back(next_buffer);
+        Ok(())
     }
-    Ok(())
 }
 
 // Update the status register and if any interrupts need to fire, raise them.