Fixing further linter warnings. No functional changes.
Test: out/host/linux-x86/nativetest/libavb_host_unittest/libavb_host_unittest
Change-Id: I6a97f9c3c0418d461c0244c8312bac65353b9a02
diff --git a/avbtool b/avbtool
index f892be9..75c9653 100755
--- a/avbtool
+++ b/avbtool
@@ -349,6 +349,9 @@
Arguments:
key_path: The path to a key file.
+
+ Raises:
+ AvbError: If RSA key parameters could not be read from file.
"""
# We used to have something as simple as this:
#
@@ -401,6 +404,9 @@
Returns:
A bytearray() with the |AvbRSAPublicKeyHeader|.
+
+ Raises:
+ AvbError: If given RSA key exponent is not 65537.
"""
key = RSAPublicKey(key_path)
if key.exponent != 65537:
@@ -436,6 +442,7 @@
return (alg_name, alg_data)
raise AvbError('Unknown algorithm type {}'.format(alg_type))
+
def lookup_hash_size_by_type(alg_type):
"""Looks up hash size by type.
@@ -454,6 +461,7 @@
return alg_data.hash_num_bytes
raise AvbError('Unsupported algorithm type {}'.format(alg_type))
+
def raw_sign(signing_helper, signing_helper_with_files,
algorithm_name, signature_num_bytes, key_path,
raw_data_to_sign):
@@ -509,8 +517,7 @@
def verify_vbmeta_signature(vbmeta_header, vbmeta_blob):
- """Checks that the signature in a vbmeta blob was made by
- the embedded public key.
+ """Checks that signature in a vbmeta blob was made by the embedded public key.
Arguments:
vbmeta_header: A AvbVBMetaHeader.
@@ -519,6 +526,10 @@
Returns:
True if the signature is valid and corresponds to the embedded
public key. Also returns True if the vbmeta blob is not signed.
+
+ Raises:
+ AvbError: If there errors calling out to openssl command during
+ signature verification.
"""
(_, alg) = lookup_algorithm_by_type(vbmeta_header.algorithm_type)
if alg.hash_name == '':
@@ -945,6 +956,9 @@
Arguments:
offset: Offset to seek to from the beginning of the file.
+
+ Raises:
+ RuntimeError: If the given offset is negative.
"""
if offset < 0:
raise RuntimeError('Seeking with negative offset: %d' % offset)
@@ -1918,7 +1932,7 @@
# Android Firmware Transparency Log Data Structures
-# AFTL section header
+
class AvbIcpHeader(object):
"""A class for parsing and writing AVB vbmeta images with transparency
log inclusion proof information.
@@ -1933,9 +1947,9 @@
SIZE = 18 # The size of the structure, in bytes
MAGIC = 'AFTL'
- FORMAT_STRING = ('!4s2L' # magic, major & minor version
- 'L' # algorithm type for transparency log
- 'H') # number of inclusion proof entries
+ FORMAT_STRING = ('!4s2L' # magic, major & minor version
+ 'L' # algorithm type for transparency log
+ 'H') # number of inclusion proof entries
def __init__(self, data=None):
"""Initializes a new transparency header object.
@@ -1946,8 +1960,9 @@
assert struct.calcsize(self.FORMAT_STRING) == self.SIZE
if data:
- (self.magic, self.required_icp_version_major, self.required_icp_version_minor,
- self.algorithm, self.icp_count) = struct.unpack(self.FORMAT_STRING, data)
+ (self.magic, self.required_icp_version_major,
+ self.required_icp_version_minor, self.algorithm,
+ self.icp_count) = struct.unpack(self.FORMAT_STRING, data)
else:
self.magic = self.MAGIC
self.required_icp_version_major = AVB_VERSION_MAJOR
@@ -1973,6 +1988,7 @@
self.required_icp_version_major,
self.required_icp_version_minor,
self.algorithm, self.icp_count)
+
def is_valid(self):
"""Ensures that values in an AvbIcpHeader structure are sane.
@@ -1980,36 +1996,42 @@
True if the values in the AvbIcpHeader are sane, False otherwise.
"""
if self.magic != AvbIcpHeader.MAGIC:
- sys.stderr.write('ICP Header: magic value mismatch: {}\n'.format(self.magic))
+ sys.stderr.write(
+ 'ICP Header: magic value mismatch: {}\n'.format(self.magic))
return False
if self.required_icp_version_major > AVB_VERSION_MAJOR:
- sys.stderr.write('ICP header: major version mismatch: {}\n'.format(self.required_icp_version_major))
+ sys.stderr.write('ICP header: major version mismatch: {}\n'.format(
+ self.required_icp_version_major))
return False
if self.required_icp_version_minor > AVB_VERSION_MINOR:
- sys.stderr.write('ICP header: minor version mismatch: {}\n'.format(self.required_icp_version_minor))
+ sys.stderr.write('ICP header: minor version mismatch: {}\n'.format(
+ self.required_icp_version_minor))
return False
if self.algorithm < 0 or self.algorithm >= len(ALGORITHMS):
- sys.stderr.write('ICP header: algorithm identifier out of range: {}\n'.format(self.algorithm))
+ sys.stderr.write(
+ 'ICP header: algorithm identifier out of range: {}\n'.format(
+ self.algorithm))
return False
if self.icp_count < 0:
- sys.stderr.write('ICP header: ICP entry count out of range: {}\n'.format(self.icp_count))
+ sys.stderr.write(
+ 'ICP header: ICP entry count out of range: {}\n'.format(
+ self.icp_count))
return False
return True
-# AFTL ICP entry header
-# All data required to verify an inclusion proof is stored as raw data just beyond this header.
-# If there are multiple ICPs, each one has an entry header, followed by the raw data.
class AvbIcpEntry(object):
"""A class for parsing and writing AFTL inclusion proof information.
- The data that represents each of the components of the ICP entry are stored immediately following the ICP entry header.
- The format is log_url, SignedLogRoot, and inclusion proof hashes.
+ The data that represents each of the components of the ICP entry are stored
+ immediately following the ICP entry header. The format is log_url,
+ SignedLogRoot, and inclusion proof hashes.
Attributes:
log_url_size: Length of the string representing the transparency log URL.
leaf_index: Leaf index in the transparency log representing this entry.
- signed_root_blob_size: Size of the SignedLogRoot for the transparency log; treat as an opaque blob for now.
+ signed_root_blob_size: Size of the SignedLogRoot for the transparency log;
+ treat as an opaque blob for now.
proof_hash_count: Number of hashes comprising the inclusion proof.
proof_size: The total size of the inclusion proof, in bytes.
next_entry: 1 if there is a next entry, 0 otherwise.
@@ -2032,7 +2054,8 @@
if data:
(self.log_url_size, self.leaf_index, self.signed_root_blob_size,
- self.proof_hash_count, self.proof_size, self.next_entry) = struct.unpack(self.FORMAT_STRING, data)
+ self.proof_hash_count, self.proof_size,
+ self.next_entry) = struct.unpack(self.FORMAT_STRING, data)
else:
self.log_url_size = 0
self.leaf_index = 0
@@ -2067,30 +2090,40 @@
True if the values in the AvbIcpEntry are sane, False otherwise.
"""
if self.log_url_size < 0:
- sys.stderr.write('ICP entry: invalid transparency log URL size: {}\n'.format(self.log_url_size))
+ sys.stderr.write(
+ 'ICP entry: invalid transparency log URL size: {}\n'.format(
+ self.log_url_size))
return False
if self.leaf_index < 0:
- sys.stderr.write('ICP entry: leaf index out of range: {}\n'.format(self.leaf_index))
+ sys.stderr.write('ICP entry: leaf index out of range: {}\n'.format(
+ self.leaf_index))
return False
if self.signed_root_blob_size < 0:
- sys.stderr.write('ICP entry: invalid signed root blob size: {}\n'.format(self.signed_root_blob_size))
+ sys.stderr.write('ICP entry: invalid signed root blob size: {}\n'.format(
+ self.signed_root_blob_size))
return False
if self.proof_hash_count < 0:
- sys.stderr.write('ICP entry: invalid proof count: {}\n'.format(self.proof_hash_count))
+ sys.stderr.write('ICP entry: invalid proof count: {}\n'.format(
+ self.proof_hash_count))
return False
if self.proof_size < 0:
- sys.stderr.write('ICP entry: invalid transparency log proof size: {}\n'.format(self.proof_size))
+ sys.stderr.write(
+ 'ICP entry: invalid transparency log proof size: {}\n'.format(
+ self.proof_size))
return False
if self.next_entry != 0 and self.next_entry != 1:
- sys.stderr.write('ICP entry: invalid next entry value: {}\n'.format(self.next_entry))
+ sys.stderr.write('ICP entry: invalid next entry value: {}\n'.format(
+ self.next_entry))
return False
return True
+
def validate_icp_blob(icp_blob):
"""Ensures that values in an ICP section are structurally sane.
Arguments:
- A bytearray() that contains an ICP, with headers, entries and data.
+ icp_blob: A bytearray() that contains an ICP, with headers, entries and
+ data.
Returns:
True if the values in the ICP section are sane, False otherwise.
@@ -2112,7 +2145,7 @@
for i in range(icp_count):
# get the entry header from the ICP blob
cur_icp_entry_blob = icp_blob[icp_index:(icp_index+AvbIcpEntry.SIZE)]
- cur_icp_entry = AvbIcpEntry(cur_icp_entry_blob)
+ cur_icp_entry = AvbIcpEntry(cur_icp_entry_blob)
# now validate the entry structure
if not cur_icp_entry.is_valid():
@@ -2121,28 +2154,32 @@
# jump past the entry header to the data blob
icp_index += AvbIcpEntry.SIZE
# extract each value from the data blob:
- transparency_log_url = icp_blob[icp_index:(icp_index + cur_icp_entry.log_url_size)]
+ transparency_log_url = icp_blob[icp_index:(icp_index
+ + cur_icp_entry.log_url_size)]
icp_index += cur_icp_entry.log_url_size
sth = icp_blob[icp_index:(icp_index+cur_icp_entry.signed_root_blob_size)]
icp_index += cur_icp_entry.signed_root_blob_size
if cur_icp_entry.proof_size / cur_icp_entry.proof_hash_count != proof_hash_size:
- sys.stderr.write('Bad proof size: {} in ICP entry {}'.format(cur_icp_entry.proof_size, i))
+ sys.stderr.write('Bad proof size: {} in ICP entry {}'.format(
+ cur_icp_entry.proof_size, i))
return False
proof_size = icp_blob[icp_index:(icp_index+cur_icp_entry.proof_size)]
icp_index += cur_icp_entry.proof_size
# check if there is a next entry
if cur_icp_entry.next_entry == 0:
if i != icp_count - 1:
- sys.stderr.write("ICP entry count mismatch\n")
+ sys.stderr.write('ICP entry count mismatch\n')
return False
break
if icp_index != len(icp_blob):
- sys.stderr.write("ICP blob size mismatch, expected {}, got {}\n".format(len(icp_blob), icp_index))
+ sys.stderr.write('ICP blob size mismatch, expected {}, got {}\n'.format(
+ len(icp_blob), icp_index))
return False
return True
# AFTL structure test functions
+
def test_default_icp_header():
"""Performs validation on a default ICP header structure.
@@ -2158,9 +2195,14 @@
sys.stdout.write('ICP header: validation failed\n')
return False
+
def test_icp_header(algorithm, icp_count):
"""Create an ICP header structure and attempt to validate it.
+ Arguments:
+ algorithm: The algorithm to be used.
+ icp_count: Number of ICPs that follow the ICP header.
+
Returns:
True if the tests pass, False otherwise.
"""
@@ -2175,6 +2217,7 @@
sys.stdout.write('ICP header: validation failed\n')
return False
+
def test_default_icp_entry():
"""Performs validation on an empty ICP entry structure.
@@ -2190,7 +2233,9 @@
sys.stdout.write('ICP entry validation failed\n')
return False
-def test_icp_entry(log_url_size, leaf_index, signed_root_blob_size, proof_hash_count, proof_size, next_entry):
+
+def test_icp_entry(log_url_size, leaf_index, signed_root_blob_size,
+ proof_hash_count, proof_size, next_entry):
"""Create an ICP entry structure and attempt to validate it.
Returns:
@@ -2211,6 +2256,7 @@
sys.stdout.write('ICP entry validation failed\n')
return False
+
def test_generate_icp_images():
"""Test cases for full AFTL ICP structure generation.
@@ -2256,7 +2302,7 @@
icp_blob.extend(icp_data)
if not validate_icp_blob(icp_blob):
return False
- # now add a 2nd entry (this should fail)
+ # Now add a 2nd entry (this should fail).
tl_url2 = 'aftl-test-server.google.ch'
sth2 = bytearray()
sth2.extend('f' * 192)
@@ -2280,7 +2326,7 @@
icp_blob.extend(icp_data2)
if validate_icp_blob(icp_blob):
return False
- #now fix the entries so this passes
+ # Now fix the entries so this passes.
icp_header.icp_count = 2
icp_entry.next_entry = 1
icp_blob2 = bytearray()
@@ -2293,9 +2339,11 @@
return False
return True
+
def test_icp_image():
"""The main function that handles testing all AFTL related components.
- Currently performs tests for validation of AFTL related structures.
+
+ Currently performs tests for validation of AFTL related structures.
Returns:
True if all tests pass, False otherwise.
@@ -2335,6 +2383,7 @@
sys.stdout.write('All AFTL ICP tests passed.\n')
return True
+
class AvbVBMetaHeader(object):
"""A class for parsing and writing AVB vbmeta images.
@@ -2623,7 +2672,7 @@
'block size {}.'.format(partition_size,
image.block_size))
- (footer, vbmeta_header, descriptors, _) = self._parse_image(image)
+ (footer, _, _, _) = self._parse_image(image)
if not footer:
raise AvbError('Given image does not have a footer.')
@@ -2754,10 +2803,12 @@
the --expected_chain_partition option
accept_zeroed_hashtree: If True, don't fail if hashtree or FEC data is
zeroed out.
+
+ Raises:
+ AvbError: If verification of the image fails.
"""
expected_chain_partitions_map = {}
if expected_chain_partitions:
- used_locations = {}
for cp in expected_chain_partitions:
cp_tokens = cp.split(':')
if len(cp_tokens) != 3:
@@ -2782,7 +2833,7 @@
image_filename)
image = ImageHandler(image_filename)
- (footer, header, descriptors, image_size) = self._parse_image(image)
+ (footer, header, descriptors, _) = self._parse_image(image)
offset = 0
if footer:
offset = footer.vbmeta_offset
@@ -2852,7 +2903,7 @@
image_ext = os.path.splitext(image_filename)[1]
image = ImageHandler(image_filename)
- (footer, header, descriptors, image_size) = self._parse_image(image)
+ (footer, header, descriptors, _) = self._parse_image(image)
offset = 0
if footer:
offset = footer.vbmeta_offset
@@ -2869,8 +2920,7 @@
ch_image_filename = os.path.join(image_dir,
desc.partition_name + image_ext)
ch_image = ImageHandler(ch_image_filename)
- (ch_footer, ch_header, ch_descriptors,
- ch_image_size) = self._parse_image(ch_image)
+ (ch_footer, ch_header, _, _) = self._parse_image(ch_image)
ch_offset = 0
ch_size = (ch_header.SIZE + ch_header.authentication_data_block_size +
ch_header.auxiliary_data_block_size)
@@ -3087,7 +3137,7 @@
return self._get_cmdline_descriptors_for_hashtree_descriptor(ht)
def icp_test_suite(self):
- """ Implements a self-contained test suite for AFTL related code
+ """Implements a self-contained test suite for AFTL related code.
Returns:
True if all tests pass, False otherwise.
@@ -4452,8 +4502,9 @@
required=True)
sub_parser.set_defaults(func=self.extract_public_key)
- sub_parser = subparsers.add_parser('icp_test_suite',
- help='Test suite for ICP related functionality.')
+ sub_parser = subparsers.add_parser(
+ 'icp_test_suite',
+ help='Test suite for ICP related functionality.')
sub_parser.set_defaults(func=self.icp_test_suite)
sub_parser = subparsers.add_parser('make_vbmeta_image',