| # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| |
| import logging, mmap, os, time |
| |
| import common |
| from autotest_lib.client.bin import os_dep, test |
| from autotest_lib.client.common_lib import error, logging_manager, utils |
| |
| """ a wrapper for using verity/dm-verity with a test backing store """ |
| |
| # enum for the 3 possible values of the module parameter. |
| ERROR_BEHAVIOR_ERROR = 'eio' |
| ERROR_BEHAVIOR_REBOOT = 'panic' |
| ERROR_BEHAVIOR_IGNORE = 'none' |
| ERROR_BEHAVIOR_NOTIFIER = 'notify' # for platform specific behavior. |
| |
| # Default configuration for verity_image |
| DEFAULT_TARGET_NAME = 'verity_image' |
| DEFAULT_ALG = 'sha1' |
| DEFAULT_IMAGE_SIZE_IN_BLOCKS = 100 |
| DEFAULT_ERROR_BEHAVIOR = ERROR_BEHAVIOR_ERROR |
| # TODO(wad) make this configurable when dm-verity doesn't hard-code 4096. |
| BLOCK_SIZE = 4096 |
| |
| def system(command, timeout=None): |
| """Delegate to utils.system to run |command|, logs stderr only on fail. |
| |
| Runs |command|, captures stdout and stderr. Logs stdout to the DEBUG |
| log no matter what, logs stderr only if the command actually fails. |
| Will time the command out after |timeout|. |
| """ |
| utils.run(command, timeout=timeout, ignore_status=False, |
| stdout_tee=utils.TEE_TO_LOGS, stderr_tee=utils.TEE_TO_LOGS, |
| stderr_is_expected=True) |
| |
| class verity_image(object): |
| """ a helper for creating dm-verity targets for testing. |
| |
| To use, |
| vi = verity_image() |
| vi.initialize(self.tmpdir, "dmveritytesta") |
| # Create a 409600 byte image with /bin/ls on it |
| # The size in bytes is returned. |
| backing_path = vi.create_backing_image(100, copy_files=['/bin/ls']) |
| # Performs hashing of the backing_path and sets up a device. |
| loop_dev = vi.prepare_backing_device() |
| # Sets up the mapped device and returns the path: |
| # E.g., /dev/mapper/autotest_dmveritytesta |
| dev = vi.create_verity_device() |
| # Access the mapped device using the returned string. |
| |
| TODO(wad) add direct verified and backing store access functions |
| to make writing modifiers easier (e.g., mmap). |
| """ |
| # Define the command template constants. |
| verity_cmd = \ |
| 'verity mode=create alg=%s payload=%s payload_blocks=%d hashtree=%s' |
| dd_cmd = 'dd if=/dev/zero of=%s bs=4096 count=0 seek=%d' |
| mkfs_cmd = 'mkfs.ext3 -b 4096 -F %s' |
| dmsetup_cmd = "dmsetup -r create autotest_%s --table '%s'" |
| |
| def _device_release(self, cmd, device): |
| if utils.system(cmd, ignore_status=True) == 0: |
| return |
| logging.warning("Could not release %s. Retrying..." % (device)) |
| # Other things (like cros-disks) may have the device open briefly, |
| # so if we initially fail, try again and attempt to gather details |
| # on who else is using the device. |
| fuser = utils.system_output('fuser -v %s' % (device), |
| retain_output=True, |
| ignore_status=True) |
| lsblk = utils.system_output('lsblk %s' % (device), |
| retain_output=True, |
| ignore_status=True) |
| time.sleep(1) |
| if utils.system(cmd, ignore_status=True) == 0: |
| return |
| raise error.TestFail('"%s" failed: %s\n%s' % (cmd, fuser, lsblk)) |
| |
| def reset(self): |
| """Idempotent call which will free any claimed system resources""" |
| # Pre-initialize these values to None |
| for attr in ['mountpoint', 'device', 'loop', 'file', 'hash_file']: |
| if not hasattr(self, attr): |
| setattr(self, attr, None) |
| logging.info("verity_image is being reset") |
| |
| if self.mountpoint is not None: |
| system('umount %s' % self.mountpoint) |
| self.mountpoint = None |
| |
| if self.device is not None: |
| self._device_release('dmsetup remove %s' % (self.device), |
| self.device) |
| self.device = None |
| |
| if self.loop is not None: |
| self._device_release('losetup -d %s' % (self.loop), self.loop) |
| self.loop = None |
| |
| if self.file is not None: |
| os.remove(self.file) |
| self.file = None |
| |
| if self.hash_file is not None: |
| os.remove(self.hash_file) |
| self.hash_file = None |
| |
| self.alg = DEFAULT_ALG |
| self.error_behavior = DEFAULT_ERROR_BEHAVIOR |
| self.blocks = DEFAULT_IMAGE_SIZE_IN_BLOCKS |
| self.file = None |
| self.has_fs = False |
| self.hash_file = None |
| self.table = None |
| self.target_name = DEFAULT_TARGET_NAME |
| |
| self.__initialized = False |
| |
| def __init__(self): |
| """Sets up the defaults for the object and then calls reset() |
| """ |
| self.reset() |
| |
| def __del__(self): |
| # Release any and all system resources. |
| self.reset() |
| |
| def _create_image(self): |
| """Creates a dummy file.""" |
| # TODO(wad) replace with python |
| utils.system_output(self.dd_cmd % (self.file, self.blocks)) |
| |
| def _create_fs(self, copy_files): |
| """sets up ext3 on the image""" |
| self.has_fs = True |
| system(self.mkfs_cmd % self.file) |
| if type(copy_files) is list: |
| for file in copy_files: |
| pass # TODO(wad) |
| |
| def _hash_image(self): |
| """runs verity over the image and saves the device mapper table""" |
| self.table = utils.system_output(self.verity_cmd % (self.alg, |
| self.file, |
| self.blocks, |
| self.hash_file)) |
| # The verity tool doesn't include a templated error value. |
| # For now, we add one. |
| self.table += " error_behavior=ERROR_BEHAVIOR" |
| logging.info("table is %s" % self.table) |
| |
| def _append_hash(self): |
| f = open(self.file, 'ab') |
| f.write(utils.read_file(self.hash_file)) |
| f.close() |
| |
| def _setup_loop(self): |
| # Setup a loop device |
| self.loop = utils.system_output('losetup -f --show %s' % (self.file)) |
| |
| def _setup_target(self): |
| # Update the table with the loop dev |
| self.table = self.table.replace('HASH_DEV', self.loop) |
| self.table = self.table.replace('ROOT_DEV', self.loop) |
| self.table = self.table.replace('ERROR_BEHAVIOR', self.error_behavior) |
| |
| system(self.dmsetup_cmd % (self.target_name, self.table)) |
| self.device = "/dev/mapper/autotest_%s" % self.target_name |
| |
| def initialize(self, |
| tmpdir, |
| target_name, |
| alg=DEFAULT_ALG, |
| size_in_blocks=DEFAULT_IMAGE_SIZE_IN_BLOCKS, |
| error_behavior=DEFAULT_ERROR_BEHAVIOR): |
| """Performs any required system-level initialization before use. |
| """ |
| try: |
| os_dep.commands('losetup', 'mkfs.ext3', 'dmsetup', 'verity', 'dd', |
| 'dumpe2fs') |
| except ValueError, e: |
| logging.error('verity_image cannot be used without: %s' % e) |
| return False |
| |
| # Used for the mapper device name and the tmpfile names. |
| self.target_name = target_name |
| |
| # Reserve some files to use. |
| self.file = os.tempnam(tmpdir, '%s.img.' % self.target_name) |
| self.hash_file = os.tempnam(tmpdir, '%s.hash.' % self.target_name) |
| |
| # Set up the configurable bits. |
| self.alg = alg |
| self.error_behavior = error_behavior |
| self.blocks = size_in_blocks |
| |
| self.__initialized = True |
| return True |
| |
| def create_backing_image(self, size_in_blocks, with_fs=True, |
| copy_files=None): |
| """Creates an image file of the given number of blocks and if specified |
| will create a filesystem and copy any files in a copy_files list to |
| the fs. |
| """ |
| self.blocks = size_in_blocks |
| self._create_image() |
| |
| if with_fs is True: |
| self._create_fs(copy_files) |
| else: |
| if type(copy_files) is list and len(copy_files) != 0: |
| logging.warning("verity_image.initialize called with " \ |
| "files to copy but no fs") |
| |
| return self.file |
| |
| def prepare_backing_device(self): |
| """Hashes the backing image, appends it to the backing image, points |
| a loop device at it and returns the path to the loop.""" |
| self._hash_image() |
| self._append_hash() |
| self._setup_loop() |
| return self.loop |
| |
| def create_verity_device(self): |
| """Sets up the device mapper node and returns its path""" |
| self._setup_target() |
| return self.device |
| |
| def verifiable(self): |
| """Returns True if the dm-verity device does not throw any errors |
| when being walked completely or False if it does.""" |
| try: |
| if self.has_fs is True: |
| system('dumpe2fs %s' % self.device) |
| # TODO(wad) replace with mmap.mmap-based access |
| system('dd if=%s of=/dev/null bs=4096' % self.device) |
| return True |
| except error.CmdError, e: |
| return False |
| |
| |
| class VerityImageTest(test.test): |
| """VerityImageTest provides a base class for verity_image tests |
| to be derived from. It sets up a verity_image object for use |
| and provides the function mod_and_test() to wrap simple test |
| cases for verity_images. |
| |
| See platform_DMVerityCorruption as an example usage. |
| """ |
| version = 1 |
| image_blocks = DEFAULT_IMAGE_SIZE_IN_BLOCKS |
| |
| def initialize(self): |
| """Overrides test.initialize() to setup a verity_image""" |
| self.verity = verity_image() |
| |
| # Example callback for mod_and_test that does nothing |
| def mod_nothing(self, run_count, backing_path, block_size, block_count): |
| pass |
| |
| def mod_and_test(self, modifier, count, expected): |
| """Takes in a callback |modifier| and runs it |count| times over |
| the verified image checking for |expected| out of verity.verifiable() |
| """ |
| tries = 0 |
| while tries < count: |
| # Start fresh then modify each block in the image. |
| self.verity.reset() |
| self.verity.initialize(self.tmpdir, self.__class__.__name__) |
| backing_path = self.verity.create_backing_image(self.image_blocks) |
| loop_dev = self.verity.prepare_backing_device() |
| |
| modifier(tries, |
| backing_path, |
| BLOCK_SIZE, |
| self.image_blocks) |
| |
| mapped_dev = self.verity.create_verity_device() |
| |
| # Now check for failure. |
| if self.verity.verifiable() is not expected: |
| raise error.TestFail( |
| '%s: verity.verifiable() not as expected (%s)' % |
| (modifier.__name__, expected)) |
| tries += 1 |