FAFT: Extract version from EC and BIOS images and verify after programming.

Modified firmware_install() to extract version strings from EC and BIOS
images and verify they match on the DUT after programming firmware.

BUG=b:147248894
TEST=Ran with local tarball, checked firmware was correctly flash to DUT
     and versions match.

Change-Id: I8d944c81b5127f25d17859ba0c55a89989888e45
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/third_party/autotest/+/2006022
Tested-by: Brent Peterson <brentpeterson@chromium.org>
Commit-Queue: Brent Peterson <brentpeterson@chromium.org>
Reviewed-by: Mary Ruthven <mruthven@chromium.org>
diff --git a/server/cros/servo/servo.py b/server/cros/servo/servo.py
index ce9e605..97c7d49 100644
--- a/server/cros/servo/servo.py
+++ b/server/cros/servo/servo.py
@@ -47,6 +47,12 @@
     @return: The first path from the image candidates, which succeeds, or None
              if all the image candidates fail.
     """
+
+    # Create the firmware_name subdirectory if it doesn't exist
+    if not os.path.exists(dest_dir):
+        os.mkdir(dest_dir)
+
+    # Try to extract image candidates from tarball
     for image in image_candidates:
         status = server_utils.system(
                 ('tar xf %s -C %s %s' % (tarball, dest_dir, image)),
@@ -1131,60 +1137,15 @@
             self._programmer.program_ec(image)
 
 
-    def _reprogram(self, tarball_path, firmware_name, image_candidates,
-                   rw_only):
-        """Helper function to reprogram firmware for EC or BIOS.
-
-        @param tarball_path: The path of the downloaded build tarball.
-        @param: firmware_name: either 'EC' or 'BIOS'.
-        @param image_candidates: A tuple of the paths of image candidates.
-        @param rw_only: True to only install firmware to its RW portions. Keep
-                the RO portions unchanged.
-
-        @raise: TestError if cannot extract firmware from the tarball.
-        """
-        dest_dir = os.path.join(os.path.dirname(tarball_path), firmware_name)
-        # Create the firmware_name subdirectory if it doesn't exist.
-        if not os.path.exists(dest_dir):
-            os.mkdir(dest_dir)
-        image = _extract_image_from_tarball(tarball_path, dest_dir,
-                                            image_candidates)
-        if not image:
-            if firmware_name == 'EC':
-                logging.info('Not a Chrome EC, ignore re-programming it')
-                return
-            else:
-                raise error.TestError('Failed to extract the %s image from '
-                                      'tarball' % firmware_name)
-
-        # Extract subsidiary binaries for EC
-        if firmware_name == 'EC':
-            # Find a monitor binary for NPCX_UUT chip type, if any.
-            mon_candidates = [ w.replace('ec.bin', 'npcx_monitor.bin')
-                                   for w in image_candidates ]
-            _extract_image_from_tarball(tarball_path, dest_dir, mon_candidates)
-
-        logging.info('Will re-program %s %snow', firmware_name,
-                     'RW ' if rw_only else '')
-
-        if firmware_name == 'EC':
-            self.program_ec(os.path.join(dest_dir, image), rw_only)
-        else:
-            self.program_bios(os.path.join(dest_dir, image), rw_only)
-
-
-    def program_firmware(self, board, model, tarball_path, rw_only=False):
-        """Program firmware (EC, if applied, and BIOS) of the DUT.
+    def extract_ec_image(self, board, model, tarball_path):
+        """Helper function to extract EC image from downloaded tarball.
 
         @param board: The DUT board name.
         @param model: The DUT model name.
         @param tarball_path: The path of the downloaded build tarball.
-        @param rw_only: True to only install firmware to its RW portions. Keep
-                the RO portions unchanged.
-        """
-        ap_image_candidates = ('image.bin', 'image-%s.bin' % model,
-                               'image-%s.bin' % board)
 
+        @return: Path to extracted EC image.
+        """
         # Best effort; try to retrieve the EC board from the version as
         # reported by the EC.
         ec_board = None
@@ -1194,16 +1155,44 @@
           logging.info('Failed to get ec_board value; ignoring')
           pass
 
-        ec_image_candidates = ['ec.bin', '%s/ec.bin' % model,
+        # Array of candidates for EC image
+        ec_image_candidates = ['ec.bin',
+                               '%s/ec.bin' % model,
                                '%s/ec.bin' % board]
         if ec_board:
           ec_image_candidates.append('%s/ec.bin' % ec_board)
 
-        self._reprogram(tarball_path, 'EC', ec_image_candidates, rw_only)
-        self._reprogram(tarball_path, 'BIOS', ap_image_candidates, rw_only)
+        # Extract EC image from tarball
+        dest_dir = os.path.join(os.path.dirname(tarball_path), 'EC')
+        ec_image = _extract_image_from_tarball(tarball_path, dest_dir,
+                                               ec_image_candidates)
 
-        self.get_power_state_controller().reset()
-        time.sleep(Servo.BOOT_DELAY)
+        # Return path to EC image
+        return os.path.join(dest_dir, ec_image)
+
+
+    def extract_bios_image(self, board, model, tarball_path):
+        """Helper function to extract BIOS image from downloaded tarball.
+
+        @param board: The DUT board name.
+        @param model: The DUT model name.
+        @param tarball_path: The path of the downloaded build tarball.
+
+        @return: Path to extracted BIOS image.
+        """
+
+        # Array of candidates for BIOS image
+        bios_image_candidates = ['image.bin',
+                                 'image-%s.bin' % model,
+                                 'image-%s.bin' % board]
+
+        # Extract BIOS image from tarball
+        dest_dir = os.path.join(os.path.dirname(tarball_path), 'BIOS')
+        bios_image = _extract_image_from_tarball(tarball_path, dest_dir,
+                                                 bios_image_candidates)
+
+        # Return path to BIOS image
+        return os.path.join(dest_dir, bios_image)
 
 
     def _switch_usbkey_power(self, power_state, detection_delay=False):
diff --git a/server/hosts/cros_host.py b/server/hosts/cros_host.py
index 7f3ed84..f60bb3b 100644
--- a/server/hosts/cros_host.py
+++ b/server/hosts/cros_host.py
@@ -150,6 +150,12 @@
     _FW_IMAGE_URL_PATTERN = CONFIG.get_config_value(
             'CROS', 'firmware_url_pattern', type=str)
 
+    # Regular expression for extracting EC version string
+    _EC_REGEX = '(%s_\w*[-\.]\w*[-\.]\w*[-\.]\w*)'
+
+    # Regular expression for extracting BIOS version string
+    _BIOS_REGEX = '(%s\.\w*\.\w*\.\w*)'
+
     @staticmethod
     def check_host(host, timeout=10):
         """
@@ -651,8 +657,19 @@
         else:
             raise error.AutoservError('Cannot find the latest firmware')
 
+    @staticmethod
+    def get_version_from_image(image, version_regex):
+        with open(image, 'rb') as f:
+            image_data = f.read()
+        match = re.findall(version_regex, image_data)
+        if match:
+            return match[0]
+        else:
+            raise error.TestFail('Failed to read version from %s.' % image)
+
+
     def firmware_install(self, build=None, rw_only=False, dest=None,
-                         local_tarball=None):
+                         local_tarball=None, verify_version=False):
         """Install firmware to the DUT.
 
         Use stateful update if the DUT is already running the same build.
@@ -673,6 +690,8 @@
         @param dest: Directory to store the firmware in.
         @param local_tarball: Path to local firmware image for installing
                               without devserver.
+        @param verify_version: True to verify EC and BIOS versions after
+                               programming firmware, default is False.
 
         TODO(dshi): After bug 381718 is fixed, update here with corresponding
                     exceptions that could be raised.
@@ -717,12 +736,52 @@
             local_tarball = os.path.join(dest, os.path.basename(fwurl))
             ds.download_file(fwurl, local_tarball)
 
-        # Program firmware using servo
+        # Extract EC image from tarball
+        logging.info('Extracting EC image.')
+        ec_image = self.servo.extract_ec_image(board, model, local_tarball)
+
+        # Extract BIOS image from tarball
+        logging.info('Extracting BIOS image.')
+        bios_image = self.servo.extract_bios_image(board, model, local_tarball)
+
         try:
+            # Program firmware using servo
             self._clear_fw_version_labels(rw_only)
-            self.servo.program_firmware(board, model, local_tarball, rw_only)
+            self.servo.program_ec(ec_image, rw_only)
+            self.servo.program_bios(bios_image, rw_only)
             if utils.host_is_in_lab_zone(self.hostname):
                 self._add_fw_version_label(build, rw_only)
+
+            # Reboot and wait for DUT after installing firmware
+            logging.info('Rebooting DUT.')
+            self.servo.get_power_state_controller().reset()
+            time.sleep(self.servo.BOOT_DELAY)
+            self.test_wait_for_boot()
+
+            # When enabled verify EC and BIOS firmware version after programming
+            if verify_version:
+                # Check programmed EC firmware against expected version
+                logging.info('Checking EC firmware version.')
+                dest_ec_version = self.get_ec_version()
+                ec_regex = self._EC_REGEX % model
+                image_ec_version = self.get_version_from_image(ec_image,
+                                                               ec_regex)
+                if dest_ec_version != image_ec_version:
+                    raise error.TestFail(
+                        'Failed to update EC RO, version %s (expected %s)' %
+                        (dest_ec_version, image_ec_version))
+
+                # Check programmed BIOS firmware against expected version
+                logging.info('Checking BIOS firmware version.')
+                dest_bios_version = self.get_firmware_version()
+                bios_version_prefix = dest_bios_version.split('.', 1)[0]
+                bios_regex = self._BIOS_REGEX % bios_version_prefix
+                image_bios_version = self.get_version_from_image(bios_image,
+                                                                 bios_regex)
+                if dest_bios_version != image_bios_version:
+                    raise error.TestFail(
+                        'Failed to update BIOS RO, version %s (expected %s)' %
+                        (dest_bios_version, image_bios_version))
         finally:
             if tmpd:
                 tmpd.clean()
diff --git a/server/site_tests/provision_FirmwareUpdate/provision_FirmwareUpdate.py b/server/site_tests/provision_FirmwareUpdate/provision_FirmwareUpdate.py
index dc43926..c23f96f 100644
--- a/server/site_tests/provision_FirmwareUpdate/provision_FirmwareUpdate.py
+++ b/server/site_tests/provision_FirmwareUpdate/provision_FirmwareUpdate.py
@@ -34,26 +34,6 @@
             logging.debug('ChromeOS image %s is staged on the USB stick.',
                           info.build)
 
-    def get_ro_firmware_ver(self, host):
-        """Get the RO firmware version from the host."""
-        result = host.run('crossystem ro_fwid', ignore_status=True)
-        if result.exit_status == 0:
-            # The firmware ID is something like "Google_Board.1234.56.0".
-            # Remove the prefix "Google_Board".
-            return result.stdout.split('.', 1)[1]
-        else:
-            return None
-
-    def get_rw_firmware_ver(self, host):
-        """Get the RW firmware version from the host."""
-        result = host.run('crossystem fwid', ignore_status=True)
-        if result.exit_status == 0:
-            # The firmware ID is something like "Google_Board.1234.56.0".
-            # Remove the prefix "Google_Board".
-            return result.stdout.split('.', 1)[1]
-        else:
-            return None
-
     def run_once(self, host, value, rw_only=False, stage_image_to_usb=False,
                  fw_path=None):
         """The method called by the control file to start the test.
@@ -78,23 +58,11 @@
             if stage_image_to_usb:
                 self.stage_image_to_usb(host)
 
-            host.firmware_install(build=value, rw_only=rw_only,
-                                  dest=self.resultsdir, local_tarball=fw_path)
+            host.firmware_install(build=value,
+                                  rw_only=rw_only,
+                                  dest=self.resultsdir,
+                                  local_tarball=fw_path,
+                                  verify_version=True)
         except Exception as e:
             logging.error(e)
             raise error.TestFail, str(e), sys.exc_info()[2]
-
-        # DUT reboots after the above firmware_install(). Wait it to boot.
-        host.test_wait_for_boot()
-
-        # Only care about the version number.
-        firmware_ver = value.rsplit('-', 1)[1]
-        if not rw_only:
-            current_ro_ver = self.get_ro_firmware_ver(host)
-            if current_ro_ver != firmware_ver:
-                raise error.TestFail('Failed to update RO, still version %s' %
-                                     current_ro_ver)
-        current_rw_ver = self.get_rw_firmware_ver(host)
-        if current_rw_ver != firmware_ver:
-            raise error.TestFail('Failed to update RW, still version %s' %
-                                 current_rw_ver)