blob: 8ec332489e2f8098a85e80746858cd88dcd7d178 [file] [log] [blame]
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Exposes the FAFTClient interface over XMLRPC.
It launches a XMLRPC server and exposes the interface of FAFTClient object.
The FAFTClient object aggreates some useful functions of exisintg SAFT
libraries.
"""
import functools, os, shutil, sys
from optparse import OptionParser
from SimpleXMLRPCServer import SimpleXMLRPCServer
# Import libraries from SAFT.
sys.path.append('/usr/local/sbin/firmware/saft')
import cgpt_state, chromeos_interface, flashrom_handler, kernel_handler
import saft_flashrom_util, tpm_handler
def allow_multiple_section_input(image_operator):
@functools.wraps(image_operator)
def wrapper(self, section):
if type(section) in (tuple, list):
for sec in section:
image_operator(self, sec)
else:
image_operator(self, section)
return wrapper
class LazyFlashromHandlerProxy:
_loaded = False
_obj = None
def __init__(self, *args, **kargs):
self._args = args
self._kargs = kargs
def _load(self):
self._obj = flashrom_handler.FlashromHandler()
self._obj.init(*self._args, **self._kargs)
self._obj.new_image()
self._loaded = True
def __getattr__(self, name):
if not self._loaded:
self._load()
return getattr(self._obj, name)
def reload(self):
self._loaded = False
class FAFTClient(object):
"""A class of FAFT client which aggregates some useful functions of SAFT.
This class can be exposed via a XMLRPC server such that its functions can
be accessed remotely.
Attributes:
_chromeos_interface: An object to encapsulate OS services functions.
_bios_handler: An object to automate BIOS flashrom testing.
_ec_handler: An object to automate EC flashrom testing.
_kernel_handler: An object to provide kernel related actions.
_tpm_handler: An object to control TPM device.
_temp_path: Path of a temp directory.
_keys_path: Path of a directory, keys/, in temp directory.
_work_path: Path of a directory, work/, in temp directory.
"""
def __init__(self):
"""Initialize the data attributes of this class."""
# TODO(waihong): Move the explicit object.init() methods to the
# objects' constructors (ChromeOSInterface, FlashromHandler,
# KernelHandler, and TpmHandler).
self._chromeos_interface = chromeos_interface.ChromeOSInterface(False)
# We keep the state of FAFT test in a permanent directory over reboots.
state_dir = '/var/tmp/faft'
self._chromeos_interface.init(state_dir, log_file='/tmp/faft_log.txt')
os.chdir(state_dir)
self._bios_handler = LazyFlashromHandlerProxy(
saft_flashrom_util,
self._chromeos_interface,
None,
'/usr/share/vboot/devkeys',
'bios')
self._ec_handler = None
if not os.system("mosys ec info"):
self._ec_handler = LazyFlashromHandlerProxy(
saft_flashrom_util,
self._chromeos_interface,
'ec_root_key.vpubk',
'/usr/share/vboot/devkeys',
'ec')
self._kernel_handler = kernel_handler.KernelHandler()
# TODO(waihong): The dev_key_path is a new argument. We do that in
# order not to break the old image and still be able to run.
try:
self._kernel_handler.init(self._chromeos_interface,
dev_key_path='/usr/share/vboot/devkeys')
except:
# Copy the key to the current working directory.
shutil.copy('/usr/share/vboot/devkeys/kernel_data_key.vbprivk', '.')
self._kernel_handler.init(self._chromeos_interface)
self._tpm_handler = tpm_handler.TpmHandler()
self._tpm_handler.init(self._chromeos_interface)
self._cgpt_state = cgpt_state.CgptState(
'SHORT', self._chromeos_interface, self.get_root_dev())
# Initialize temporary directory path
self._temp_path = '/var/tmp/faft/autest'
self._keys_path = os.path.join(self._temp_path, 'keys')
self._work_path = os.path.join(self._temp_path, 'work')
def _dispatch(self, method, params):
"""This _dispatch method handles string conversion especially.
Since we turn off allow_dotted_names option. So any string conversion,
like str(FAFTClient.method), i.e. FAFTClient.method.__str__, failed
via XML RPC call.
"""
is_str = method.endswith('.__str__')
if is_str:
method = method.rsplit('.', 1)[0]
try:
func = getattr(self, method)
except AttributeError:
raise Exception('method "%s" is not supported' % method)
else:
if is_str:
return str(func)
else:
return func(*params)
def is_available(self):
"""Function for polling the RPC server availability.
Returns:
Always True.
"""
return True
def run_shell_command(self, command):
"""Run shell command.
Args:
command: A shell command to be run.
"""
self._chromeos_interface.log('Requesting run shell command')
self._chromeos_interface.run_shell_command(command)
def run_shell_command_get_output(self, command):
"""Run shell command and get its console output.
Args:
command: A shell command to be run.
Returns:
A list of strings stripped of the newline characters.
"""
self._chromeos_interface.log(
'Requesting run shell command and get its console output')
return self._chromeos_interface.run_shell_command_get_output(command)
def software_reboot(self):
"""Request software reboot."""
self._chromeos_interface.log('Requesting software reboot')
self._chromeos_interface.run_shell_command('reboot')
def get_platform_name(self):
"""Get the platform name of the current system.
Returns:
A string of the platform name.
"""
self._chromeos_interface.log('Requesting get platform name')
return self._chromeos_interface.run_shell_command_get_output(
'mosys platform name')[0]
def get_crossystem_value(self, key):
"""Get crossystem value of the requested key.
Args:
key: A crossystem key.
Returns:
A string of the requested crossystem value.
"""
self._chromeos_interface.log('Requesting get crossystem value')
return self._chromeos_interface.run_shell_command_get_output(
'crossystem %s' % key)[0]
def get_root_dev(self):
"""Get the name of root device without partition number.
Returns:
A string of the root device without partition number.
"""
self._chromeos_interface.log('Requesting get root device')
return self._chromeos_interface.get_root_dev()
def get_root_part(self):
"""Get the name of root device with partition number.
Returns:
A string of the root device with partition number.
"""
self._chromeos_interface.log('Requesting get root part')
return self._chromeos_interface.get_root_part()
def set_try_fw_b(self):
"""Set 'Try Frimware B' flag in crossystem."""
self._chromeos_interface.log('Requesting restart with firmware B')
self._chromeos_interface.cs.fwb_tries = 1
def request_recovery_boot(self):
"""Request running in recovery mode on the restart."""
self._chromeos_interface.log('Requesting restart in recovery mode')
self._chromeos_interface.cs.request_recovery()
def get_gbb_flags(self):
"""Get the GBB flags.
Returns:
An integer of the GBB flags.
"""
self._chromeos_interface.log('Getting GBB flags')
return self._bios_handler.get_gbb_flags()
def get_firmware_flags(self, section):
"""Get the preamble flags of a firmware section.
Args:
section: A firmware section, either 'a' or 'b'.
Returns:
An integer of the preamble flags.
"""
self._chromeos_interface.log('Getting preamble flags of firmware %s' %
section)
return self._bios_handler.get_section_flags(section)
def set_firmware_flags(self, section, flags):
"""Set the preamble flags of a firmware section.
Args:
section: A firmware section, either 'a' or 'b'.
flags: An integer of preamble flags.
"""
self._chromeos_interface.log(
'Setting preamble flags of firmware %s to %s' % (section, flags))
version = self.get_firmware_version(section)
self._bios_handler.set_section_version(section, version, flags,
write_through=True)
def get_EC_firmware_sha(self):
"""Get SHA1 hash of EC RW firmware section. """
return self._ec_handler.get_section_sha('rw')
def reload_firmware(self):
"""Reload the firmware image that may be changed."""
self._bios_handler.reload()
@allow_multiple_section_input
def corrupt_EC(self, section):
"""Corrupt the requested EC section signature.
Args:
section: A EC section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Corrupting EC signature %s' %
section)
self._ec_handler.corrupt_firmware(section, corrupt_all=True)
@allow_multiple_section_input
def corrupt_EC_body(self, section):
"""Corrupt the requested EC section body.
Args:
section: An EC section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Corrupting EC body %s' %
section)
self._ec_handler.corrupt_firmware_body(section, corrupt_all=True)
@allow_multiple_section_input
def restore_EC(self, section):
"""Restore the previously corrupted EC section signature.
Args:
section: An EC section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Restoring EC signature %s' %
section)
self._ec_handler.restore_firmware(section, restore_all=True)
@allow_multiple_section_input
def restore_EC_body(self, section):
"""Restore the previously corrupted EC section body.
Args:
section: An EC section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Restoring EC body %s' %
section)
self._ec_handler.restore_firmware_body(section, restore_all=True)
@allow_multiple_section_input
def corrupt_firmware(self, section):
"""Corrupt the requested firmware section signature.
Args:
section: A firmware section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Corrupting firmware signature %s' %
section)
self._bios_handler.corrupt_firmware(section)
@allow_multiple_section_input
def corrupt_firmware_body(self, section):
"""Corrupt the requested firmware section body.
Args:
section: A firmware section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Corrupting firmware body %s' %
section)
self._bios_handler.corrupt_firmware_body(section)
@allow_multiple_section_input
def restore_firmware(self, section):
"""Restore the previously corrupted firmware section signature.
Args:
section: A firmware section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Restoring firmware signature %s' %
section)
self._bios_handler.restore_firmware(section)
@allow_multiple_section_input
def restore_firmware_body(self, section):
"""Restore the previously corrupted firmware section body.
Args:
section: A firmware section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Restoring firmware body %s' %
section)
self._bios_handler.restore_firmware_body(section)
def get_firmware_version(self, section):
"""Retrieve firmware version of a section."""
return self._bios_handler.get_section_version(section)
def _modify_firmware_version(self, section, delta):
"""Modify firmware version for the requested section, by adding delta.
The passed in delta, a positive or a negative number, is added to the
original firmware version.
"""
original_version = self.get_firmware_version(section)
new_version = original_version + delta
flags = self._bios_handler.get_section_flags(section)
self._chromeos_interface.log(
'Setting firmware section %s version from %d to %d' % (
section, original_version, new_version))
self._bios_handler.set_section_version(section, new_version, flags,
write_through=True)
@allow_multiple_section_input
def move_firmware_backward(self, section):
"""Decrement firmware version for the requested section."""
self._modify_firmware_version(section, -1)
@allow_multiple_section_input
def move_firmware_forward(self, section):
"""Increase firmware version for the requested section."""
self._modify_firmware_version(section, 1)
def retrieve_firmware_version(self, section):
"""Return firmware version."""
return self._bios_handler.get_section_version(section)
def retrieve_firmware_datakey_version(self, section):
"""Return firmware data key version."""
return self._bios_handler.get_section_datakey_version(section)
def retrieve_kernel_subkey_version(self,section):
"""Return kernel subkey version."""
return self._bios_handler.get_section_kernel_subkey_version(section)
@allow_multiple_section_input
def corrupt_kernel(self, section):
"""Corrupt the requested kernel section.
Args:
section: A kernel section, either 'a' or 'b'.
"""
self._chromeos_interface.log('Corrupting kernel %s' % section)
self._kernel_handler.corrupt_kernel(section)
@allow_multiple_section_input
def restore_kernel(self, section):
"""Restore the requested kernel section (previously corrupted).
Args:
section: A kernel section, either 'a' or 'b'.
"""
self._chromeos_interface.log('restoring kernel %s' % section)
self._kernel_handler.restore_kernel(section)
def _modify_kernel_version(self, section, delta):
"""Modify kernel version for the requested section, by adding delta.
The passed in delta, a positive or a negative number, is added to the
original kernel version.
"""
original_version = self._kernel_handler.get_version(section)
new_version = original_version + delta
self._chromeos_interface.log(
'Setting kernel section %s version from %d to %d' % (
section, original_version, new_version))
self._kernel_handler.set_version(section, new_version)
@allow_multiple_section_input
def move_kernel_backward(self, section):
"""Decrement kernel version for the requested section."""
self._modify_kernel_version(section, -1)
@allow_multiple_section_input
def move_kernel_forward(self, section):
"""Increase kernel version for the requested section."""
self._modify_kernel_version(section, 1)
def diff_kernel_a_b(self):
"""Compare kernel A with B.
Returns:
True: if kernel A is different with B.
False: if kernel A is the same as B.
"""
rootdev = self._chromeos_interface.get_root_dev()
kernel_a = self._chromeos_interface.join_part(rootdev, '3')
kernel_b = self._chromeos_interface.join_part(rootdev, '5')
# The signature (some kind of hash) for the kernel body is stored in
# the beginning. So compare the first 64KB (including header, preamble,
# and signature) should be enough to check them identical.
header_a = self._chromeos_interface.read_partition(kernel_a, 0x10000)
header_b = self._chromeos_interface.read_partition(kernel_b, 0x10000)
return header_a != header_b
def run_cgpt_test_loop(self):
"""Run the CgptState test loop. The tst logic is handled in the client.
Returns:
0: there are more cgpt tests to execute.
1: no more CgptState test, finished.
"""
return self._cgpt_state.test_loop()
def set_cgpt_test_step(self, step):
"""Set the CgptState test step.
Args:
step: A test step number.
"""
self._cgpt_state.set_step(step)
def get_cgpt_test_step(self):
"""Get the CgptState test step.
Returns:
A test step number.
"""
return self._cgpt_state.get_step()
def setup_firmwareupdate_temp_dir(self):
"""Setup temporary directory.
Devkeys are copied to _key_path. Then, shellball,
/usr/sbin/chromeos-firmwareupdate, is extracted to _work_path.
"""
os.mkdir(self._temp_path)
os.chdir(self._temp_path)
os.mkdir(self._work_path)
shutil.copytree('/usr/share/vboot/devkeys/', self._keys_path)
self.run_shell_command(
'sh /usr/sbin/chromeos-firmwareupdate --sb_extract %s'
% self._work_path)
def retrieve_shellball_fwid(self):
"""Retrieve shellball's fwid.
This method should be called after setup_firmwareupdate_temp_dir.
Returns:
Shellball's fwid.
"""
self.run_shell_command('dump_fmap -x %s %s' %
(os.path.join(self._work_path, 'bios.bin'),
'RW_FWID_A'))
[fwid] = self.run_shell_command_get_output(
'eu-strings RW_FWID_A')
return fwid
def cleanup_firmwareupdate_temp_dir(self):
"""Cleanup temporary directory."""
shutil.rmtree(self._temp_path)
def repack_firmwareupdate_shellball(self, append):
"""Repack shellball with new fwid.
New fwid follows the rule: [orignal_fwid]-[append].
Args:
append: use for new fwid naming.
"""
shutil.copy('/usr/sbin/chromeos-firmwareupdate', '%s' %
os.path.join(self._temp_path,
'chromeos-firmwareupdate-%s' % append))
self.run_shell_command(
'sh %schromeos-firmwareupdate-%s --sb_repack %s'
% (self._temp_path, append, self._work_path))
args = ['-i']
args.append('"s/TARGET_FWID=\\"\\(.*\\)\\"/TARGET_FWID=\\"\\1.%s\\"/g"'
% append)
args.append('%s'
% os.path.join(self._temp_path,
'chromeos-firmwareupdate-%s' % append))
cmd = 'sed %s' % ' '.join(args)
self.run_shell_command(cmd)
args = ['-i']
args.append('"s/TARGET_UNSTABLE=\\".*\\"/TARGET_UNSTABLE=\\"\\"/g"')
args.append('%s'
% os.path.join(self._temp_path,
'chromeos-firmwareupdate-%s' % append))
cmd = 'sed %s' % ' '.join(args)
self.run_shell_command(cmd)
def resign_firmware(self, version):
"""Resign firmware with version.
Args:
version: new firmware version number.
"""
args = [os.path.join(self._work_path, 'bios.bin')]
args.append(os.path.join(self._temp_path, 'output.bin'))
args.append(os.path.join(self._keys_path, 'firmware_data_key.vbprivk'))
args.append(os.path.join(self._keys_path, 'firmware.keyblock'))
args.append(os.path.join(self._keys_path,
'dev_firmware_data_key.vbprivk'))
args.append(os.path.join(self._keys_path, 'dev_firmware.keyblock'))
args.append(os.path.join(self._keys_path, 'kernel_subkey.vbpubk'))
args.append('%d' % version)
args.append('1')
cmd = '/usr/share/vboot/bin/resign_firmwarefd.sh %s' % ' '.join(args)
self.run_shell_command(cmd)
shutil.copyfile('%s' % os.path.join(self._temp_path, 'output.bin'),
'%s' % os.path.join(self._work_path, 'bios.bin'))
def run_firmware_autoupdate(self, append):
"""Do firmwareupdate with autoupdate mode using new shellball.
Args:
append: decide which shellball to use with format
chromeos-firmwareupdate-[append]
"""
self.run_shell_command(
'/bin/sh %s --mode autoupdate --noupdate_ec'
% os.path.join(self._temp_path,
'chromeos-firmwareupdate-%s' % append))
def run_firmware_bootok(self, append):
"""Do bootok mode using new shellball.
Copy firmware B to firmware A if reboot success.
"""
self.run_shell_command(
'/bin/sh %s --mode bootok' % os.path.join(self._temp_path,
'chromeos-firmwareupdate-%s' % append))
def run_firmware_recovery(self):
"""Recovery to original shellball."""
args = ['/usr/sbin/chromeos-firmwareupdate']
args.append('--mode recovery')
args.append('--noupdate_ec')
cmd = '/bin/sh %s' % ' '.join(args)
self.run_shell_command(cmd)
def get_temp_path(self):
"""Get temporary directory path."""
return self._temp_path
def get_keys_path(self):
"""Get temporary ke path."""
return self._keys_path
def cleanup(self):
"""Cleanup for the RPC server. Currently nothing."""
pass
def main():
parser = OptionParser(usage='Usage: %prog [options]')
parser.add_option('--port', type='int', dest='port', default=9990,
help='port number of XMLRPC server')
(options, args) = parser.parse_args()
faft_client = FAFTClient()
# Launch the XMLRPC server to provide FAFTClient commands.
server = SimpleXMLRPCServer(('localhost', options.port), allow_none=True,
logRequests=False)
server.register_introspection_functions()
server.register_instance(faft_client)
print 'XMLRPC Server: Serving FAFTClient on port %s' % options.port
server.serve_forever()
if __name__ == '__main__':
main()